Skip to content

Commit 3ec81a8

Browse files
authored
[kv] Support 'table.delete.behavior' config to disable or ignore deletion on the table (#1783)
1 parent 98af40c commit 3ec81a8

File tree

20 files changed

+568
-32
lines changed

20 files changed

+568
-32
lines changed

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.fluss.fs.FsPathAndFileName;
5151
import org.apache.fluss.metadata.DatabaseDescriptor;
5252
import org.apache.fluss.metadata.DatabaseInfo;
53+
import org.apache.fluss.metadata.DeleteBehavior;
5354
import org.apache.fluss.metadata.KvFormat;
5455
import org.apache.fluss.metadata.LogFormat;
5556
import org.apache.fluss.metadata.PartitionInfo;
@@ -86,6 +87,7 @@
8687

8788
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
8889
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
90+
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
8991
import static org.apache.fluss.testutils.DataTestUtils.row;
9092
import static org.assertj.core.api.Assertions.assertThat;
9193
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -331,6 +333,92 @@ void testCreateInvalidDatabaseAndTable() throws Exception {
331333
"Table name __internal_table is invalid: '__' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server");
332334
}
333335

336+
@Test
337+
void testCreateTableWithDeleteBehavior() {
338+
// Test 1: FIRST_ROW merge engine - should set delete behavior to IGNORE
339+
TablePath tablePath1 = TablePath.of("fluss", "test_ignore_delete_for_first_row");
340+
Map<String, String> properties1 = new HashMap<>();
341+
properties1.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "first_row");
342+
343+
TableDescriptor tableDescriptor1 =
344+
TableDescriptor.builder()
345+
.schema(DEFAULT_SCHEMA)
346+
.comment("first row merge engine table")
347+
.properties(properties1)
348+
.build();
349+
admin.createTable(tablePath1, tableDescriptor1, false).join();
350+
351+
// Get the table and verify delete behavior is changed to IGNORE
352+
TableInfo tableInfo1 = admin.getTableInfo(tablePath1).join();
353+
assertThat(tableInfo1.getTableConfig().getDeleteBehavior()).hasValue(DeleteBehavior.IGNORE);
354+
355+
// Test 2: VERSIONED merge engine - should set delete behavior to IGNORE
356+
TablePath tablePath2 = TablePath.of("fluss", "test_ignore_delete_for_versioned");
357+
Map<String, String> properties2 = new HashMap<>();
358+
properties2.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned");
359+
properties2.put(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key(), "age");
360+
TableDescriptor tableDescriptor2 =
361+
TableDescriptor.builder()
362+
.schema(DEFAULT_SCHEMA)
363+
.comment("versioned merge engine table")
364+
.properties(properties2)
365+
.build();
366+
admin.createTable(tablePath2, tableDescriptor2, false).join();
367+
// Get the table and verify delete behavior is changed to IGNORE
368+
TableInfo tableInfo2 = admin.getTableInfo(tablePath2).join();
369+
assertThat(tableInfo2.getTableConfig().getDeleteBehavior()).hasValue(DeleteBehavior.IGNORE);
370+
371+
// Test 3: FIRST_ROW merge engine with delete behavior explicitly set to ALLOW
372+
TablePath tablePath3 = TablePath.of("fluss", "test_allow_delete_for_first_row");
373+
Map<String, String> properties3 = new HashMap<>();
374+
properties3.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "first_row");
375+
properties3.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "ALLOW");
376+
TableDescriptor tableDescriptor3 =
377+
TableDescriptor.builder()
378+
.schema(DEFAULT_SCHEMA)
379+
.comment("first row merge engine table")
380+
.properties(properties3)
381+
.build();
382+
assertThatThrownBy(() -> admin.createTable(tablePath3, tableDescriptor3, false).join())
383+
.hasRootCauseInstanceOf(InvalidConfigException.class)
384+
.hasMessageContaining(
385+
"Table with 'FIRST_ROW' merge engine does not support delete operations. "
386+
+ "The 'table.delete.behavior' config must be set to 'ignore' or 'disable', but got 'allow'.");
387+
388+
// Test 4: VERSIONED merge engine with delete behavior explicitly set to ALLOW
389+
TablePath tablePath4 = TablePath.of("fluss", "test_allow_delete_for_versioned");
390+
Map<String, String> properties4 = new HashMap<>();
391+
properties4.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned");
392+
properties4.put(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key(), "age");
393+
properties4.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "ALLOW");
394+
TableDescriptor tableDescriptor4 =
395+
TableDescriptor.builder()
396+
.schema(DEFAULT_SCHEMA)
397+
.comment("versioned merge engine table")
398+
.properties(properties4)
399+
.build();
400+
assertThatThrownBy(() -> admin.createTable(tablePath4, tableDescriptor4, false).join())
401+
.hasRootCauseInstanceOf(InvalidConfigException.class)
402+
.hasMessageContaining(
403+
"Table with 'VERSIONED' merge engine does not support delete operations. "
404+
+ "The 'table.delete.behavior' config must be set to 'ignore' or 'disable', but got 'allow'.");
405+
406+
// Test 5: Log table - not allow to set delete behavior
407+
TablePath tablePath5 = TablePath.of("fluss", "test_set_delete_behavior_for_log_table");
408+
Map<String, String> properties5 = new HashMap<>();
409+
properties5.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "IGNORE");
410+
TableDescriptor tableDescriptor5 =
411+
TableDescriptor.builder()
412+
.schema(DATA1_SCHEMA)
413+
.comment("log table")
414+
.properties(properties5)
415+
.build();
416+
assertThatThrownBy(() -> admin.createTable(tablePath5, tableDescriptor5, false).join())
417+
.hasRootCauseInstanceOf(InvalidConfigException.class)
418+
.hasMessageContaining(
419+
"The 'table.delete.behavior' configuration is only supported for primary key tables.");
420+
}
421+
334422
@Test
335423
void testCreateTableWithInvalidProperty() {
336424
TablePath tablePath = TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), "test_property");

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.annotation.PublicEvolving;
2222
import org.apache.fluss.compression.ArrowCompressionType;
2323
import org.apache.fluss.metadata.DataLakeFormat;
24+
import org.apache.fluss.metadata.DeleteBehavior;
2425
import org.apache.fluss.metadata.KvFormat;
2526
import org.apache.fluss.metadata.LogFormat;
2627
import org.apache.fluss.metadata.MergeEngineType;
@@ -1381,6 +1382,18 @@ public class ConfigOptions {
13811382
"The column name of the version column for the `versioned` merge engine. "
13821383
+ "If the merge engine is set to `versioned`, the version column must be set.");
13831384

1385+
public static final ConfigOption<DeleteBehavior> TABLE_DELETE_BEHAVIOR =
1386+
key("table.delete.behavior")
1387+
.enumType(DeleteBehavior.class)
1388+
.defaultValue(DeleteBehavior.ALLOW)
1389+
.withDescription(
1390+
"Defines the delete behavior for the primary key table. "
1391+
+ "The supported delete behaviors are `allow`, `ignore`, and `disable`. "
1392+
+ "The `allow` behavior allows normal delete operations (default). "
1393+
+ "The `ignore` behavior silently skips delete requests without error. "
1394+
+ "The `disable` behavior rejects delete requests with a clear error message. "
1395+
+ "For tables with FIRST_ROW or VERSIONED merge engines, this option defaults to `ignore`.");
1396+
13841397
// ------------------------------------------------------------------------
13851398
// ConfigOptions for Kv
13861399
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.compression.ArrowCompressionInfo;
2222
import org.apache.fluss.metadata.DataLakeFormat;
23+
import org.apache.fluss.metadata.DeleteBehavior;
2324
import org.apache.fluss.metadata.KvFormat;
2425
import org.apache.fluss.metadata.LogFormat;
2526
import org.apache.fluss.metadata.MergeEngineType;
@@ -111,6 +112,11 @@ public Optional<String> getMergeEngineVersionColumn() {
111112
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN);
112113
}
113114

115+
/** Gets the delete behavior of the table. */
116+
public Optional<DeleteBehavior> getDeleteBehavior() {
117+
return config.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
118+
}
119+
114120
/** Gets the Arrow compression type and compression level of the table. */
115121
public ArrowCompressionInfo getArrowCompressionInfo() {
116122
return ArrowCompressionInfo.fromConf(config);
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.exception;
19+
20+
/**
21+
* Exception thrown when deletion operations are disabled on a table. This exception is used when a
22+
* table has been configured with delete behavior set to 'disable', indicating that deletion
23+
* operations are not allowed and should be rejected.
24+
*
25+
* @see org.apache.fluss.config.ConfigOptions#TABLE_DELETE_BEHAVIOR
26+
*/
27+
public class DeletionDisabledException extends ApiException {
28+
public DeletionDisabledException(String message, Throwable cause) {
29+
super(message, cause);
30+
}
31+
32+
public DeletionDisabledException(String message) {
33+
this(message, null);
34+
}
35+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.metadata;
19+
20+
/**
21+
* The delete behavior for the primary key table.
22+
*
23+
* <p>This enum defines how delete operations should be handled for primary key tables. It provides
24+
* different strategies to control whether deletions are allowed, ignored, or explicitly disabled.
25+
*
26+
* @since 0.8
27+
*/
28+
public enum DeleteBehavior {
29+
30+
/**
31+
* Allow normal delete operations. This is the default behavior for primary key tables without
32+
* merge engines.
33+
*/
34+
ALLOW,
35+
36+
/**
37+
* Silently ignore delete requests without error. Delete operations will be dropped at the
38+
* server side, and no deletion will be performed. This is the default behavior for tables with
39+
* FIRST_ROW or VERSIONED merge engines.
40+
*/
41+
IGNORE,
42+
43+
/**
44+
* Reject delete requests with a clear error message. Any attempt to perform delete operations
45+
* will result in an exception being thrown.
46+
*/
47+
DISABLE;
48+
49+
/** Creates a {@link DeleteBehavior} from the given string. */
50+
public static DeleteBehavior fromString(String behavior) {
51+
switch (behavior.toUpperCase()) {
52+
case "ALLOW":
53+
return ALLOW;
54+
case "IGNORE":
55+
return IGNORE;
56+
case "DISABLE":
57+
return DISABLE;
58+
default:
59+
throw new IllegalArgumentException("Unsupported delete behavior: " + behavior);
60+
}
61+
}
62+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.Set;
6060

6161
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
62+
import static org.apache.fluss.config.ConfigOptions.TABLE_DELETE_BEHAVIOR;
6263
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX;
6364
import static org.apache.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
6465
import static org.apache.fluss.flink.utils.DataLakeUtils.getDatalakeFormat;
@@ -187,6 +188,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
187188
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
188189
tableOptions.get(toFlinkOption(TABLE_DATALAKE_FORMAT)),
189190
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE),
191+
tableOptions.get(toFlinkOption(TABLE_DELETE_BEHAVIOR)),
190192
tableOptions.get(FlinkConnectorOptions.BUCKET_NUMBER),
191193
getBucketKeys(tableOptions),
192194
tableOptions.get(FlinkConnectorOptions.SINK_BUCKET_SHUFFLE));

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
2525
import org.apache.fluss.flink.utils.PushdownUtils.ValueConversion;
2626
import org.apache.fluss.metadata.DataLakeFormat;
27+
import org.apache.fluss.metadata.DeleteBehavior;
2728
import org.apache.fluss.metadata.MergeEngineType;
2829
import org.apache.fluss.metadata.TablePath;
2930
import org.apache.fluss.row.GenericRow;
@@ -74,7 +75,8 @@ public class FlinkTableSink
7475
private final List<String> partitionKeys;
7576
private final boolean streaming;
7677
@Nullable private final MergeEngineType mergeEngineType;
77-
private final boolean ignoreDelete;
78+
private final boolean sinkIgnoreDelete;
79+
private final DeleteBehavior tableDeleteBehavior;
7880
private final int numBucket;
7981
private final List<String> bucketKeys;
8082
private final boolean shuffleByBucketId;
@@ -92,7 +94,8 @@ public FlinkTableSink(
9294
boolean streaming,
9395
@Nullable MergeEngineType mergeEngineType,
9496
@Nullable DataLakeFormat lakeFormat,
95-
boolean ignoreDelete,
97+
boolean sinkIgnoreDelete,
98+
DeleteBehavior tableDeleteBehavior,
9699
int numBucket,
97100
List<String> bucketKeys,
98101
boolean shuffleByBucketId) {
@@ -103,7 +106,8 @@ public FlinkTableSink(
103106
this.partitionKeys = partitionKeys;
104107
this.streaming = streaming;
105108
this.mergeEngineType = mergeEngineType;
106-
this.ignoreDelete = ignoreDelete;
109+
this.sinkIgnoreDelete = sinkIgnoreDelete;
110+
this.tableDeleteBehavior = tableDeleteBehavior;
107111
this.numBucket = numBucket;
108112
this.bucketKeys = bucketKeys;
109113
this.shuffleByBucketId = shuffleByBucketId;
@@ -115,7 +119,7 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
115119
if (!streaming) {
116120
return ChangelogMode.insertOnly();
117121
} else {
118-
if (primaryKeyIndexes.length > 0 || ignoreDelete) {
122+
if (primaryKeyIndexes.length > 0 || sinkIgnoreDelete) {
119123
// primary-key table or ignore_delete mode can accept RowKind.DELETE
120124
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
121125
for (RowKind kind : requestedMode.getContainedKinds()) {
@@ -200,7 +204,7 @@ private FlinkSink<RowData> getFlinkSink(int[] targetColumnIndexes) {
200204
partitionKeys,
201205
lakeFormat,
202206
shuffleByBucketId,
203-
new RowDataSerializationSchema(false, ignoreDelete))
207+
new RowDataSerializationSchema(false, sinkIgnoreDelete))
204208
: new FlinkSink.AppendSinkWriterBuilder<>(
205209
tablePath,
206210
flussConfig,
@@ -210,7 +214,7 @@ private FlinkSink<RowData> getFlinkSink(int[] targetColumnIndexes) {
210214
partitionKeys,
211215
lakeFormat,
212216
shuffleByBucketId,
213-
new RowDataSerializationSchema(true, ignoreDelete));
217+
new RowDataSerializationSchema(true, sinkIgnoreDelete));
214218

215219
return new FlinkSink<>(flinkSinkWriterBuilder);
216220
}
@@ -235,7 +239,8 @@ public DynamicTableSink copy() {
235239
streaming,
236240
mergeEngineType,
237241
lakeFormat,
238-
ignoreDelete,
242+
sinkIgnoreDelete,
243+
tableDeleteBehavior,
239244
numBucket,
240245
bucketKeys,
241246
shuffleByBucketId);
@@ -360,6 +365,14 @@ private void validateUpdatableAndDeletable() {
360365
"Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.",
361366
tablePath, mergeEngineType));
362367
}
368+
369+
// Check table-level delete behavior configuration
370+
if (tableDeleteBehavior == DeleteBehavior.DISABLE) {
371+
throw new UnsupportedOperationException(
372+
String.format(
373+
"Table %s has delete behavior set to 'disable' which does not support DELETE statements.",
374+
tablePath));
375+
}
363376
}
364377

365378
private Map<Integer, LogicalType> getPrimaryKeyTypes() {

0 commit comments

Comments
 (0)