Skip to content

Commit 09615c9

Browse files
committed
Jarks' review
1 parent 2e288af commit 09615c9

File tree

14 files changed

+239
-264
lines changed

14 files changed

+239
-264
lines changed

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.fluss.fs.FsPathAndFileName;
5151
import org.apache.fluss.metadata.DatabaseDescriptor;
5252
import org.apache.fluss.metadata.DatabaseInfo;
53+
import org.apache.fluss.metadata.DeleteBehavior;
5354
import org.apache.fluss.metadata.KvFormat;
5455
import org.apache.fluss.metadata.LogFormat;
5556
import org.apache.fluss.metadata.PartitionInfo;
@@ -86,6 +87,7 @@
8687

8788
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
8889
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
90+
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
8991
import static org.apache.fluss.testutils.DataTestUtils.row;
9092
import static org.assertj.core.api.Assertions.assertThat;
9193
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -311,6 +313,92 @@ void testCreateInvalidDatabaseAndTable() {
311313
.hasMessageContaining("Database name null is invalid: null string is not allowed");
312314
}
313315

316+
@Test
317+
void testCreateTableWithDeleteBehavior() {
318+
// Test 1: FIRST_ROW merge engine - should set delete behavior to IGNORE
319+
TablePath tablePath1 = TablePath.of("fluss", "test_ignore_delete_for_first_row");
320+
Map<String, String> properties1 = new HashMap<>();
321+
properties1.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "first_row");
322+
323+
TableDescriptor tableDescriptor1 =
324+
TableDescriptor.builder()
325+
.schema(DEFAULT_SCHEMA)
326+
.comment("first row merge engine table")
327+
.properties(properties1)
328+
.build();
329+
admin.createTable(tablePath1, tableDescriptor1, false).join();
330+
331+
// Get the table and verify delete behavior is changed to IGNORE
332+
TableInfo tableInfo1 = admin.getTableInfo(tablePath1).join();
333+
assertThat(tableInfo1.getTableConfig().getDeleteBehavior()).hasValue(DeleteBehavior.IGNORE);
334+
335+
// Test 2: VERSIONED merge engine - should set delete behavior to IGNORE
336+
TablePath tablePath2 = TablePath.of("fluss", "test_ignore_delete_for_versioned");
337+
Map<String, String> properties2 = new HashMap<>();
338+
properties2.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned");
339+
properties2.put(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key(), "age");
340+
TableDescriptor tableDescriptor2 =
341+
TableDescriptor.builder()
342+
.schema(DEFAULT_SCHEMA)
343+
.comment("versioned merge engine table")
344+
.properties(properties2)
345+
.build();
346+
admin.createTable(tablePath2, tableDescriptor2, false).join();
347+
// Get the table and verify delete behavior is changed to IGNORE
348+
TableInfo tableInfo2 = admin.getTableInfo(tablePath2).join();
349+
assertThat(tableInfo2.getTableConfig().getDeleteBehavior()).hasValue(DeleteBehavior.IGNORE);
350+
351+
// Test 3: FIRST_ROW merge engine with delete behavior explicitly set to ALLOW
352+
TablePath tablePath3 = TablePath.of("fluss", "test_allow_delete_for_first_row");
353+
Map<String, String> properties3 = new HashMap<>();
354+
properties3.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "first_row");
355+
properties3.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "ALLOW");
356+
TableDescriptor tableDescriptor3 =
357+
TableDescriptor.builder()
358+
.schema(DEFAULT_SCHEMA)
359+
.comment("first row merge engine table")
360+
.properties(properties3)
361+
.build();
362+
assertThatThrownBy(() -> admin.createTable(tablePath3, tableDescriptor3, false).join())
363+
.hasRootCauseInstanceOf(InvalidConfigException.class)
364+
.hasMessageContaining(
365+
"Table with 'FIRST_ROW' merge engine does not support delete operations. "
366+
+ "The delete behavior must be set to 'ignore' or 'disable', but got 'allow'.");
367+
368+
// Test 4: VERSIONED merge engine with delete behavior explicitly set to ALLOW
369+
TablePath tablePath4 = TablePath.of("fluss", "test_allow_delete_for_versioned");
370+
Map<String, String> properties4 = new HashMap<>();
371+
properties4.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "versioned");
372+
properties4.put(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key(), "age");
373+
properties4.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "ALLOW");
374+
TableDescriptor tableDescriptor4 =
375+
TableDescriptor.builder()
376+
.schema(DEFAULT_SCHEMA)
377+
.comment("versioned merge engine table")
378+
.properties(properties4)
379+
.build();
380+
assertThatThrownBy(() -> admin.createTable(tablePath4, tableDescriptor4, false).join())
381+
.hasRootCauseInstanceOf(InvalidConfigException.class)
382+
.hasMessageContaining(
383+
"Table with 'VERSIONED' merge engine does not support delete operations. "
384+
+ "The delete behavior must be set to 'ignore' or 'disable', but got 'allow'.");
385+
386+
// Test 5: Log table - not allow to set delete behavior
387+
TablePath tablePath5 = TablePath.of("fluss", "test_set_delete_behavior_for_log_table");
388+
Map<String, String> properties5 = new HashMap<>();
389+
properties5.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "IGNORE");
390+
TableDescriptor tableDescriptor5 =
391+
TableDescriptor.builder()
392+
.schema(DATA1_SCHEMA)
393+
.comment("log table")
394+
.properties(properties5)
395+
.build();
396+
assertThatThrownBy(() -> admin.createTable(tablePath5, tableDescriptor5, false).join())
397+
.hasRootCauseInstanceOf(InvalidConfigException.class)
398+
.hasMessageContaining(
399+
"Delete behavior configuration is only supported for primary key tables.");
400+
}
401+
314402
@Test
315403
void testCreateTableWithInvalidProperty() {
316404
TablePath tablePath = TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), "test_property");

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ public Optional<String> getMergeEngineVersionColumn() {
113113
}
114114

115115
/** Gets the delete behavior of the table. */
116-
public DeleteBehavior getDeleteBehavior() {
117-
return config.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
116+
public Optional<DeleteBehavior> getDeleteBehavior() {
117+
return config.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
118118
}
119119

120120
/** Gets the Arrow compression type and compression level of the table. */

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.Set;
6060

6161
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
62+
import static org.apache.fluss.config.ConfigOptions.TABLE_DELETE_BEHAVIOR;
6263
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX;
6364
import static org.apache.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
6465
import static org.apache.fluss.flink.utils.DataLakeUtils.getDatalakeFormat;
@@ -187,6 +188,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
187188
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
188189
tableOptions.get(toFlinkOption(TABLE_DATALAKE_FORMAT)),
189190
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE),
191+
tableOptions.get(toFlinkOption(TABLE_DELETE_BEHAVIOR)),
190192
tableOptions.get(FlinkConnectorOptions.BUCKET_NUMBER),
191193
getBucketKeys(tableOptions),
192194
tableOptions.get(FlinkConnectorOptions.SINK_BUCKET_SHUFFLE));

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
2525
import org.apache.fluss.flink.utils.PushdownUtils.ValueConversion;
2626
import org.apache.fluss.metadata.DataLakeFormat;
27+
import org.apache.fluss.metadata.DeleteBehavior;
2728
import org.apache.fluss.metadata.MergeEngineType;
2829
import org.apache.fluss.metadata.TablePath;
2930
import org.apache.fluss.row.GenericRow;
@@ -74,7 +75,8 @@ public class FlinkTableSink
7475
private final List<String> partitionKeys;
7576
private final boolean streaming;
7677
@Nullable private final MergeEngineType mergeEngineType;
77-
private final boolean ignoreDelete;
78+
private final boolean sinkIgnoreDelete;
79+
private final DeleteBehavior tableDeleteBehavior;
7880
private final int numBucket;
7981
private final List<String> bucketKeys;
8082
private final boolean shuffleByBucketId;
@@ -92,7 +94,8 @@ public FlinkTableSink(
9294
boolean streaming,
9395
@Nullable MergeEngineType mergeEngineType,
9496
@Nullable DataLakeFormat lakeFormat,
95-
boolean ignoreDelete,
97+
boolean sinkIgnoreDelete,
98+
DeleteBehavior tableDeleteBehavior,
9699
int numBucket,
97100
List<String> bucketKeys,
98101
boolean shuffleByBucketId) {
@@ -103,7 +106,8 @@ public FlinkTableSink(
103106
this.partitionKeys = partitionKeys;
104107
this.streaming = streaming;
105108
this.mergeEngineType = mergeEngineType;
106-
this.ignoreDelete = ignoreDelete;
109+
this.sinkIgnoreDelete = sinkIgnoreDelete;
110+
this.tableDeleteBehavior = tableDeleteBehavior;
107111
this.numBucket = numBucket;
108112
this.bucketKeys = bucketKeys;
109113
this.shuffleByBucketId = shuffleByBucketId;
@@ -115,7 +119,7 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
115119
if (!streaming) {
116120
return ChangelogMode.insertOnly();
117121
} else {
118-
if (primaryKeyIndexes.length > 0 || ignoreDelete) {
122+
if (primaryKeyIndexes.length > 0 || sinkIgnoreDelete) {
119123
// primary-key table or ignore_delete mode can accept RowKind.DELETE
120124
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
121125
for (RowKind kind : requestedMode.getContainedKinds()) {
@@ -200,7 +204,7 @@ private FlinkSink<RowData> getFlinkSink(int[] targetColumnIndexes) {
200204
partitionKeys,
201205
lakeFormat,
202206
shuffleByBucketId,
203-
new RowDataSerializationSchema(false, ignoreDelete))
207+
new RowDataSerializationSchema(false, sinkIgnoreDelete))
204208
: new FlinkSink.AppendSinkWriterBuilder<>(
205209
tablePath,
206210
flussConfig,
@@ -210,7 +214,7 @@ private FlinkSink<RowData> getFlinkSink(int[] targetColumnIndexes) {
210214
partitionKeys,
211215
lakeFormat,
212216
shuffleByBucketId,
213-
new RowDataSerializationSchema(true, ignoreDelete));
217+
new RowDataSerializationSchema(true, sinkIgnoreDelete));
214218

215219
return new FlinkSink<>(flinkSinkWriterBuilder);
216220
}
@@ -235,7 +239,8 @@ public DynamicTableSink copy() {
235239
streaming,
236240
mergeEngineType,
237241
lakeFormat,
238-
ignoreDelete,
242+
sinkIgnoreDelete,
243+
tableDeleteBehavior,
239244
numBucket,
240245
bucketKeys,
241246
shuffleByBucketId);
@@ -360,6 +365,14 @@ private void validateUpdatableAndDeletable() {
360365
"Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.",
361366
tablePath, mergeEngineType));
362367
}
368+
369+
// Check table-level delete behavior configuration
370+
if (tableDeleteBehavior == DeleteBehavior.DISABLE) {
371+
throw new UnsupportedOperationException(
372+
String.format(
373+
"Table %s has delete behavior set to 'disable' which does not support DELETE statements.",
374+
tablePath));
375+
}
363376
}
364377

365378
private Map<Integer, LogicalType> getPrimaryKeyTypes() {

0 commit comments

Comments
 (0)