Skip to content

Commit 872554b

Browse files
authored
[kv/auto increment column] constrain upsert behavior for auto increment column (#2117)
* constrain upsert behavior for auto increment column * auto increment column should not be nullable
1 parent c6af64d commit 872554b

File tree

4 files changed

+60
-7
lines changed

4 files changed

+60
-7
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
6161
WriterClient writerClient) {
6262
super(tablePath, tableInfo, writerClient);
6363
RowType rowType = tableInfo.getRowType();
64-
sanityCheck(rowType, tableInfo.getPrimaryKeys(), partialUpdateColumns);
64+
sanityCheck(
65+
rowType,
66+
tableInfo.getPrimaryKeys(),
67+
tableInfo.getSchema().getAutoIncrementColumnNames(),
68+
partialUpdateColumns);
6569

6670
this.targetColumns = partialUpdateColumns;
6771
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
@@ -80,9 +84,20 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
8084
}
8185

8286
private static void sanityCheck(
83-
RowType rowType, List<String> primaryKeys, @Nullable int[] targetColumns) {
87+
RowType rowType,
88+
List<String> primaryKeys,
89+
List<String> autoIncrementColumnNames,
90+
@Nullable int[] targetColumns) {
8491
// skip check when target columns is null
8592
if (targetColumns == null) {
93+
if (!autoIncrementColumnNames.isEmpty()) {
94+
throw new IllegalArgumentException(
95+
String.format(
96+
"This table has auto increment column %s. "
97+
+ "Explicitly specifying values for an auto increment column is not allowed. "
98+
+ "Please specify non-auto-increment columns as target columns using partialUpdate first.",
99+
autoIncrementColumnNames));
100+
}
86101
return;
87102
}
88103
BitSet targetColumnsSet = new BitSet();
@@ -103,10 +118,23 @@ private static void sanityCheck(
103118
pkColumnSet.set(pkIndex);
104119
}
105120

121+
BitSet autoIncrementColumnSet = new BitSet();
122+
// explicitly specifying values for an auto increment column is not allowed
123+
for (String autoIncrementColumnName : autoIncrementColumnNames) {
124+
int autoIncrementColumnIndex = rowType.getFieldIndex(autoIncrementColumnName);
125+
if (targetColumnsSet.get(autoIncrementColumnIndex)) {
126+
throw new IllegalArgumentException(
127+
String.format(
128+
"Explicitly specifying values for the auto increment column %s is not allowed.",
129+
autoIncrementColumnName));
130+
}
131+
autoIncrementColumnSet.set(autoIncrementColumnIndex);
132+
}
133+
106134
// check the columns not in targetColumns should be nullable
107135
for (int i = 0; i < rowType.getFieldCount(); i++) {
108-
// column not in primary key
109-
if (!pkColumnSet.get(i)) {
136+
// column not in primary key and not in auto increment column
137+
if (!pkColumnSet.get(i) && !autoIncrementColumnSet.get(i)) {
110138
// the column should be nullable
111139
if (!rowType.getTypeAt(i).isNullable()) {
112140
throw new IllegalArgumentException(

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,29 @@ void testInvalidPartialUpdate() throws Exception {
662662
.hasMessage(
663663
"Invalid target column index: 3 for table test_db_1.test_pk_table_1. The table only has 3 columns.");
664664
}
665+
666+
// test invalid auto increment column upsert
667+
schema =
668+
Schema.newBuilder()
669+
.column("a", DataTypes.INT())
670+
.column("b", DataTypes.INT())
671+
.column("c", DataTypes.INT())
672+
.primaryKey("a")
673+
.enableAutoIncrement("c")
674+
.build();
675+
tableDescriptor = TableDescriptor.builder().schema(schema).distributedBy(3, "a").build();
676+
TablePath tablePath =
677+
TablePath.of("test_db_1", "test_invalid_auto_increment_column_upsert");
678+
createTable(tablePath, tableDescriptor, true);
679+
try (Table table = conn.getTable(tablePath)) {
680+
assertThatThrownBy(() -> table.newUpsert().createWriter())
681+
.hasMessage(
682+
"This table has auto increment column [c]. Explicitly specifying values for an auto increment column is not allowed. Please specify non-auto-increment columns as target columns using partialUpdate first.");
683+
684+
assertThatThrownBy(() -> table.newUpsert().partialUpdate("a", "c").createWriter())
685+
.hasMessage(
686+
"Explicitly specifying values for the auto increment column c is not allowed.");
687+
}
665688
}
666689

667690
@Test

fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -607,8 +607,10 @@ private static List<Column> normalizeColumns(
607607
"The data type of auto increment column must be INT or BIGINT.");
608608
}
609609

610-
// primary key should not nullable
611-
if (pkSet.contains(column.getName()) && column.getDataType().isNullable()) {
610+
// primary key and auto increment column should not nullable
611+
if ((pkSet.contains(column.getName())
612+
|| autoIncrementColumnNames.contains(column.getName()))
613+
&& column.getDataType().isNullable()) {
612614
newColumns.add(
613615
new Column(
614616
column.getName(),

fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public class SchemaJsonSerdeTest extends JsonSerdeTestBase<Schema> {
8787
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c is third column\",\"id\":2}],\"highest_field_id\":2}";
8888

8989
static final String SCHEMA_JSON_4 =
90-
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
90+
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
9191

9292
SchemaJsonSerdeTest() {
9393
super(SchemaJsonSerde.INSTANCE);

0 commit comments

Comments
 (0)