Skip to content

Commit 69fe1d8

Browse files
committed
constrain upsert behavior for auto increment column
1 parent 453d64b commit 69fe1d8

File tree

2 files changed

+49
-2
lines changed

2 files changed

+49
-2
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
6161
WriterClient writerClient) {
6262
super(tablePath, tableInfo, writerClient);
6363
RowType rowType = tableInfo.getRowType();
64-
sanityCheck(rowType, tableInfo.getPrimaryKeys(), partialUpdateColumns);
64+
sanityCheck(
65+
rowType,
66+
tableInfo.getPrimaryKeys(),
67+
tableInfo.getSchema().getAutoIncrementColumnNames(),
68+
partialUpdateColumns);
6569

6670
this.targetColumns = partialUpdateColumns;
6771
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
@@ -80,9 +84,20 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
8084
}
8185

8286
private static void sanityCheck(
83-
RowType rowType, List<String> primaryKeys, @Nullable int[] targetColumns) {
87+
RowType rowType,
88+
List<String> primaryKeys,
89+
List<String> autoIncrementColumnNames,
90+
@Nullable int[] targetColumns) {
8491
// skip check when target columns is null
8592
if (targetColumns == null) {
93+
if (!autoIncrementColumnNames.isEmpty()) {
94+
throw new IllegalArgumentException(
95+
String.format(
96+
"This table has auto increment column %s."
97+
+ "Explicitly specifying values for an auto increment column is not allowed."
98+
+ "Please specify non-auto-increment columns as target columns using partialUpdate first.",
99+
autoIncrementColumnNames));
100+
}
86101
return;
87102
}
88103
BitSet targetColumnsSet = new BitSet();
@@ -116,6 +131,16 @@ private static void sanityCheck(
116131
}
117132
}
118133
}
134+
135+
// explicitly specifying values for an auto increment column is not allowed
136+
for (String autoIncrementColumnName : autoIncrementColumnNames) {
137+
if (targetColumnsSet.get(rowType.getFieldIndex(autoIncrementColumnName))) {
138+
throw new IllegalArgumentException(
139+
String.format(
140+
"Explicitly specifying values for the auto increment column %s is not allowed.",
141+
autoIncrementColumnName));
142+
}
143+
}
119144
}
120145

121146
/**

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,28 @@ void testInvalidPartialUpdate() throws Exception {
662662
.hasMessage(
663663
"Invalid target column index: 3 for table test_db_1.test_pk_table_1. The table only has 3 columns.");
664664
}
665+
666+
// test invalid auto increment column upsert
667+
schema =
668+
Schema.newBuilder()
669+
.column("a", DataTypes.INT())
670+
.column("b", DataTypes.INT())
671+
.primaryKey("a")
672+
.enableAutoIncrement("b")
673+
.build();
674+
tableDescriptor = TableDescriptor.builder().schema(schema).distributedBy(3, "a").build();
675+
TablePath tablePath =
676+
TablePath.of("test_db_1", "test_invalid_auto_increment_column_upsert");
677+
createTable(tablePath, tableDescriptor, true);
678+
try (Table table = conn.getTable(tablePath)) {
679+
assertThatThrownBy(() -> table.newUpsert().createWriter())
680+
.hasMessage(
681+
"This table has auto increment column [b]. Explicitly specifying values for an auto increment column is not allowed. Please specify non-auto-increment columns as target columns using partialUpdate first.");
682+
683+
assertThatThrownBy(() -> table.newUpsert().partialUpdate("a", "b").createWriter())
684+
.hasMessage(
685+
"Explicitly specifying values for the auto increment column b is not allowed.");
686+
}
665687
}
666688

667689
@Test

0 commit comments

Comments
 (0)