Skip to content

Commit 9ccba68

Browse files
committed
addressed jark's comments
1 parent 10076a4 commit 9ccba68

File tree

15 files changed

+275
-120
lines changed

15 files changed

+275
-120
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.exception;
19+
20+
/**
21+
* Exception thrown when deletion operations are disabled on a table. This exception is used when a
22+
* table has been configured with delete behavior set to 'disable', indicating that deletion
23+
* operations are not allowed and should be rejected.
24+
*
25+
* @see org.apache.fluss.config.ConfigOptions#TABLE_DELETE_BEHAVIOR
26+
*/
27+
public class DeletionDisabledException extends ApiException {
28+
public DeletionDisabledException(String message, Throwable cause) {
29+
super(message, cause);
30+
}
31+
32+
public DeletionDisabledException(String message) {
33+
this(message, null);
34+
}
35+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959

6060
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
6161
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX;
62-
import static org.apache.fluss.config.FlussConfigUtils.TABLE_PREFIX;
6362
import static org.apache.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
6463
import static org.apache.fluss.flink.utils.DataLakeUtils.getDatalakeFormat;
6564
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeyIndexes;
@@ -226,10 +225,10 @@ private static Configuration toFlussClientConfig(
226225
ConfigOptions.BOOTSTRAP_SERVERS.key(),
227226
tableOptions.get(FlinkConnectorOptions.BOOTSTRAP_SERVERS.key()));
228227

229-
// forward all client configs and table configs
228+
// forward all client configs
230229
tableOptions.forEach(
231230
(key, value) -> {
232-
if (key.startsWith(CLIENT_PREFIX) || key.startsWith(TABLE_PREFIX)) {
231+
if (key.startsWith(CLIENT_PREFIX)) {
233232
flussConfig.setString(key, value);
234233
}
235234
});

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@
1717

1818
package org.apache.fluss.flink.sink;
1919

20-
import org.apache.fluss.config.ConfigOptions;
2120
import org.apache.fluss.config.Configuration;
2221
import org.apache.fluss.flink.sink.serializer.RowDataSerializationSchema;
2322
import org.apache.fluss.flink.sink.writer.FlinkSinkWriter;
2423
import org.apache.fluss.flink.utils.PushdownUtils;
2524
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
2625
import org.apache.fluss.flink.utils.PushdownUtils.ValueConversion;
2726
import org.apache.fluss.metadata.DataLakeFormat;
28-
import org.apache.fluss.metadata.DeleteBehavior;
2927
import org.apache.fluss.metadata.MergeEngineType;
3028
import org.apache.fluss.metadata.TablePath;
3129
import org.apache.fluss.row.GenericRow;
@@ -362,15 +360,6 @@ private void validateUpdatableAndDeletable() {
362360
"Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.",
363361
tablePath, mergeEngineType));
364362
}
365-
366-
// Check table-level delete behavior configuration
367-
DeleteBehavior deleteBehavior = flussConfig.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
368-
if (deleteBehavior == DeleteBehavior.DISABLE) {
369-
throw new UnsupportedOperationException(
370-
String.format(
371-
"Table %s has delete behavior set to 'disable' which does not support DELETE statements.",
372-
tablePath));
373-
}
374363
}
375364

376365
private Map<Integer, LogicalType> getPrimaryKeyTypes() {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -206,28 +206,23 @@ public ChangelogMode getChangelogMode() {
206206
} else {
207207
if (hasPrimaryKey()) {
208208
// pk table
209-
if (mergeEngineType == MergeEngineType.FIRST_ROW
210-
|| mergeEngineType == MergeEngineType.VERSIONED) {
209+
if (mergeEngineType == MergeEngineType.FIRST_ROW) {
211210
return ChangelogMode.insertOnly();
212211
} else {
213212
// Check delete behavior configuration
214213
Configuration tableConf = Configuration.fromMap(tableOptions);
215214
DeleteBehavior deleteBehavior =
216215
tableConf.get(ConfigOptions.TABLE_DELETE_BEHAVIOR);
217-
if (deleteBehavior == DeleteBehavior.IGNORE) {
218-
// If delete operations are ignored, only insert and update are relevant
216+
if (deleteBehavior == DeleteBehavior.ALLOW) {
217+
return ChangelogMode.all();
218+
} else {
219+
// If delete operations are ignored or disabled, only insert and update are
220+
// relevant
219221
return ChangelogMode.newBuilder()
220222
.addContainedKind(RowKind.INSERT)
221223
.addContainedKind(RowKind.UPDATE_BEFORE)
222224
.addContainedKind(RowKind.UPDATE_AFTER)
223225
.build();
224-
} else if (deleteBehavior == DeleteBehavior.DISABLE) {
225-
throw new UnsupportedOperationException(
226-
String.format(
227-
"Table %s has delete behavior set to 'disable' which does not support DELETE statements.",
228-
tablePath));
229-
} else {
230-
return ChangelogMode.all();
231226
}
232227
}
233228
} else {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,42 +1096,17 @@ public InsertAndExpectValues(List<String> insertValues, List<String> expectedRow
10961096
}
10971097
}
10981098

1099-
@Test
1100-
void testDeleteBehaviorDisable() {
1101-
String tableName = "delete_behavior_disable_table";
1102-
tBatchEnv.executeSql(
1103-
String.format(
1104-
"create table %s ("
1105-
+ " a int not null,"
1106-
+ " b bigint null, "
1107-
+ " c string null, "
1108-
+ " primary key (a) not enforced"
1109-
+ ") with ('table.delete.behavior' = 'disable')",
1110-
tableName));
1111-
1112-
TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
1113-
assertThatThrownBy(
1114-
() ->
1115-
tBatchEnv
1116-
.executeSql("DELETE FROM " + tableName + " WHERE a = 1")
1117-
.await())
1118-
.isInstanceOf(UnsupportedOperationException.class)
1119-
.hasMessage(
1120-
String.format(
1121-
"Table %s has delete behavior set to 'disable' which does not support DELETE statements.",
1122-
tablePath));
1123-
}
1124-
1125-
@Test
1126-
void testDeleteBehaviorIgnore() throws Exception {
1099+
@ParameterizedTest
1100+
@ValueSource(strings = {"ignore", "disable"})
1101+
void testDeleteBehaviorIgnoreOrDisable(String deleteBehavior) throws Exception {
11271102
String tableName = "delete_behavior_ignore_table";
11281103
tEnv.executeSql(
11291104
String.format(
11301105
"create table %s ("
11311106
+ " a int not null primary key not enforced,"
11321107
+ " b string"
1133-
+ ") with ('table.delete.behavior' = 'ignore')",
1134-
tableName));
1108+
+ ") with ('table.delete.behavior' = '%s')",
1109+
tableName, deleteBehavior));
11351110

11361111
// Insert some data
11371112
tEnv.executeSql(
@@ -1151,7 +1126,8 @@ void testDeleteBehaviorIgnore() throws Exception {
11511126
tEnv.createTemporaryView("changelog_source", changelogData);
11521127

11531128
// Insert changelog data
1154-
tEnv.executeSql(String.format("INSERT INTO %s SELECT * FROM changelog_source", tableName));
1129+
tEnv.executeSql(String.format("INSERT INTO %s SELECT * FROM changelog_source", tableName))
1130+
.await();
11551131

11561132
CloseableIterator<Row> rowIter =
11571133
tEnv.executeSql(String.format("select * from %s", tableName)).collect();

fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import org.apache.fluss.compression.ArrowCompressionInfo;
2222
import org.apache.fluss.config.ConfigOptions;
2323
import org.apache.fluss.config.Configuration;
24+
import org.apache.fluss.exception.DeletionDisabledException;
2425
import org.apache.fluss.exception.KvStorageException;
2526
import org.apache.fluss.memory.MemorySegmentPool;
27+
import org.apache.fluss.metadata.DeleteBehavior;
2628
import org.apache.fluss.metadata.KvFormat;
2729
import org.apache.fluss.metadata.LogFormat;
2830
import org.apache.fluss.metadata.PhysicalTablePath;
@@ -277,9 +279,14 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target
277279
byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
278280
KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
279281
if (kvRecord.getRow() == null) {
280-
if (!rowMerger.supportsDelete()) {
282+
DeleteBehavior deleteBehavior = rowMerger.deleteBehavior();
283+
if (deleteBehavior == DeleteBehavior.IGNORE) {
281284
// skip delete rows if the merger doesn't support yet
282285
continue;
286+
} else if (deleteBehavior == DeleteBehavior.DISABLE) {
287+
throw new DeletionDisabledException(
288+
"Delete operations are disabled for this table. "
289+
+ "The table.delete.behavior is set to 'disable'.");
283290
}
284291
// it's for deletion
285292
byte[] oldValue = getFromBufferOrKv(key);

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -59,26 +59,13 @@ public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) {
5959
@Nullable
6060
@Override
6161
public BinaryRow delete(BinaryRow oldRow) {
62-
switch (deleteBehavior) {
63-
case ALLOW:
64-
// returns null to indicate the row is deleted
65-
return null;
66-
case IGNORE:
67-
// returns the old row unchanged to ignore the delete operation
68-
return oldRow;
69-
case DISABLE:
70-
throw new UnsupportedOperationException(
71-
"Delete operations are disabled for this table. "
72-
+ "The table.delete.behavior is set to 'disable'.");
73-
default:
74-
throw new IllegalArgumentException(
75-
"Unsupported delete behavior: " + deleteBehavior);
76-
}
62+
// returns null to indicate the row is deleted
63+
return null;
7764
}
7865

7966
@Override
80-
public boolean supportsDelete() {
81-
return deleteBehavior != DeleteBehavior.DISABLE;
67+
public DeleteBehavior deleteBehavior() {
68+
return deleteBehavior;
8269
}
8370

8471
@Override
@@ -120,25 +107,12 @@ public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) {
120107
@Nullable
121108
@Override
122109
public BinaryRow delete(BinaryRow oldRow) {
123-
switch (deleteBehavior) {
124-
case ALLOW:
125-
return partialUpdater.deleteRow(oldRow);
126-
case IGNORE:
127-
// returns the old row unchanged to ignore the delete operation
128-
return oldRow;
129-
case DISABLE:
130-
throw new UnsupportedOperationException(
131-
"Delete operations are disabled for this table. "
132-
+ "The table.delete.behavior is set to 'disable'.");
133-
default:
134-
throw new IllegalArgumentException(
135-
"Unsupported delete behavior: " + deleteBehavior);
136-
}
110+
return partialUpdater.deleteRow(oldRow);
137111
}
138112

139113
@Override
140-
public boolean supportsDelete() {
141-
return deleteBehavior != DeleteBehavior.DISABLE;
114+
public DeleteBehavior deleteBehavior() {
115+
return deleteBehavior;
142116
}
143117
}
144118
}

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.server.kv.rowmerger;
1919

20+
import org.apache.fluss.metadata.DeleteBehavior;
2021
import org.apache.fluss.metadata.MergeEngineType;
2122
import org.apache.fluss.row.BinaryRow;
2223

@@ -29,6 +30,12 @@
2930
*/
3031
public class FirstRowRowMerger implements RowMerger {
3132

33+
private final DeleteBehavior deleteBehavior;
34+
35+
public FirstRowRowMerger(DeleteBehavior deleteBehavior) {
36+
this.deleteBehavior = deleteBehavior;
37+
}
38+
3239
@Nullable
3340
@Override
3441
public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) {
@@ -44,8 +51,8 @@ public BinaryRow delete(BinaryRow oldRow) {
4451
}
4552

4653
@Override
47-
public boolean supportsDelete() {
48-
return false;
54+
public DeleteBehavior deleteBehavior() {
55+
return deleteBehavior;
4956
}
5057

5158
@Override

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ public interface RowMerger {
4545
/**
4646
* Merge the old row with a delete row.
4747
*
48-
* <p>This method will be invoked only when {@link #supportsDelete()} returns true.
48+
* <p>This method will be invoked only when {@link #deleteBehavior()} returns {@link
49+
* DeleteBehavior#ALLOW}.
4950
*
5051
* @param oldRow the old row.
5152
* @return the merged row, or null if the row is deleted.
@@ -54,11 +55,11 @@ public interface RowMerger {
5455
BinaryRow delete(BinaryRow oldRow);
5556

5657
/**
57-
* Whether the merger supports to merge delete rows.
58+
* The behavior of delete operations on primary key tables.
5859
*
59-
* @return true if the merger supports delete operation.
60+
* @return {@link DeleteBehavior}
6061
*/
61-
boolean supportsDelete();
62+
DeleteBehavior deleteBehavior();
6263

6364
/** Dynamically configure the target columns to merge and return the effective merger. */
6465
RowMerger configureTargetColumns(@Nullable int[] targetColumns);
@@ -71,7 +72,7 @@ static RowMerger create(TableConfig tableConf, Schema schema, KvFormat kvFormat)
7172
if (mergeEngineType.isPresent()) {
7273
switch (mergeEngineType.get()) {
7374
case FIRST_ROW:
74-
return new FirstRowRowMerger();
75+
return new FirstRowRowMerger(deleteBehavior);
7576
case VERSIONED:
7677
Optional<String> versionColumn = tableConf.getMergeEngineVersionColumn();
7778
if (!versionColumn.isPresent()) {
@@ -80,7 +81,8 @@ static RowMerger create(TableConfig tableConf, Schema schema, KvFormat kvFormat)
8081
"'%s' must be set for versioned merge engine.",
8182
ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key()));
8283
}
83-
return new VersionedRowMerger(schema.getRowType(), versionColumn.get());
84+
return new VersionedRowMerger(
85+
schema.getRowType(), versionColumn.get(), deleteBehavior);
8486
default:
8587
throw new IllegalArgumentException(
8688
"Unsupported merge engine type: " + mergeEngineType.get());

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.server.kv.rowmerger;
1919

20+
import org.apache.fluss.metadata.DeleteBehavior;
2021
import org.apache.fluss.metadata.MergeEngineType;
2122
import org.apache.fluss.row.BinaryRow;
2223
import org.apache.fluss.row.TimestampLtz;
@@ -46,8 +47,12 @@ public class VersionedRowMerger implements RowMerger {
4647

4748
private final Comparator<BinaryRow> versionComparator;
4849

49-
public VersionedRowMerger(RowType schema, String versionColumnName) {
50+
private final DeleteBehavior deleteBehavior;
51+
52+
public VersionedRowMerger(
53+
RowType schema, String versionColumnName, DeleteBehavior deleteBehavior) {
5054
this.versionComparator = createVersionComparator(schema, versionColumnName);
55+
this.deleteBehavior = deleteBehavior;
5156
}
5257

5358
@Nullable
@@ -65,8 +70,8 @@ public BinaryRow delete(BinaryRow oldRow) {
6570
}
6671

6772
@Override
68-
public boolean supportsDelete() {
69-
return false;
73+
public DeleteBehavior deleteBehavior() {
74+
return deleteBehavior;
7075
}
7176

7277
@Override

0 commit comments

Comments
 (0)