Skip to content

Commit 453d64b

Browse files
xx789633wuchong
andauthored
[kv/auto increment column] add auto increment column in Fluss schema (#1887)
* add auto increment column in Fluss schema * support auto increment column in Flink SQL syntax --------- Co-authored-by: Jark Wu <[email protected]>
1 parent b0a4124 commit 453d64b

File tree

6 files changed

+252
-24
lines changed

6 files changed

+252
-24
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1412,6 +1412,17 @@ public class ConfigOptions {
14121412
+ "The `disable` behavior rejects delete requests with a clear error message. "
14131413
+ "For tables with FIRST_ROW or VERSIONED merge engines, this option defaults to `ignore`.");
14141414

1415+
public static final ConfigOption<String> TABLE_AUTO_INCREMENT_FIELDS =
1416+
key("table.auto-increment.fields")
1417+
.stringType()
1418+
.noDefaultValue()
1419+
.withDescription(
1420+
"Defines the auto increment columns. "
1421+
+ "The auto increment column can only be used in primary-key table."
1422+
+ "With an auto increment column in the table, whenever a new row is inserted into the table, the new row will be assigned with the next available value from the auto-increment sequence."
1423+
+ "The auto increment column can only be used in primary-key table. The data type of the auto increment column must be INT or BIGINT."
1424+
+ "Currently a table can have only one auto-increment column.");
1425+
14151426
// ------------------------------------------------------------------------
14161427
// ConfigOptions for Kv
14171428
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.annotation.PublicStable;
2222
import org.apache.fluss.types.DataField;
2323
import org.apache.fluss.types.DataType;
24+
import org.apache.fluss.types.DataTypeRoot;
2425
import org.apache.fluss.types.RowType;
2526
import org.apache.fluss.utils.EncodingUtils;
2627
import org.apache.fluss.utils.StringUtils;
@@ -60,6 +61,7 @@ public final class Schema implements Serializable {
6061

6162
private final List<Column> columns;
6263
private final @Nullable PrimaryKey primaryKey;
64+
private final List<String> autoIncrementColumnNames;
6365
private final RowType rowType;
6466

6567
/**
@@ -68,9 +70,14 @@ public final class Schema implements Serializable {
6870
*/
6971
private final int highestFieldId;
7072

71-
private Schema(List<Column> columns, @Nullable PrimaryKey primaryKey, int highestFieldId) {
72-
this.columns = normalizeColumns(columns, primaryKey);
73+
private Schema(
74+
List<Column> columns,
75+
@Nullable PrimaryKey primaryKey,
76+
int highestFieldId,
77+
List<String> autoIncrementColumnNames) {
78+
this.columns = normalizeColumns(columns, primaryKey, autoIncrementColumnNames);
7379
this.primaryKey = primaryKey;
80+
this.autoIncrementColumnNames = autoIncrementColumnNames;
7481
// pre-create the row type as it is the most frequently used part of the schema
7582
this.rowType =
7683
new RowType(
@@ -91,6 +98,10 @@ public Optional<PrimaryKey> getPrimaryKey() {
9198
return Optional.ofNullable(primaryKey);
9299
}
93100

101+
public List<String> getAutoIncrementColumnNames() {
102+
return autoIncrementColumnNames;
103+
}
104+
94105
public RowType getRowType() {
95106
return rowType;
96107
}
@@ -205,10 +216,12 @@ public static Schema.Builder newBuilder() {
205216
public static final class Builder {
206217
private final List<Column> columns;
207218
private @Nullable PrimaryKey primaryKey;
219+
private final List<String> autoIncrementColumnNames;
208220
private AtomicInteger highestFieldId;
209221

210222
private Builder() {
211223
columns = new ArrayList<>();
224+
autoIncrementColumnNames = new ArrayList<>();
212225
highestFieldId = new AtomicInteger(-1);
213226
}
214227

@@ -360,6 +373,23 @@ public Builder primaryKeyNamed(String constraintName, List<String> columnNames)
360373
return this;
361374
}
362375

376+
/**
377+
* Declares a column to be auto-incremented. With an auto-increment column in the table,
378+
* whenever a new row is inserted into the table, the new row will be assigned with the next
379+
* available value from the auto-increment sequence. A table can have at most one auto
380+
* increment column.
381+
*
382+
* @param columnName the auto increment column name
383+
*/
384+
public Builder enableAutoIncrement(String columnName) {
385+
checkState(
386+
autoIncrementColumnNames.isEmpty(),
387+
"Multiple auto increment columns are not supported yet.");
388+
checkArgument(columnName != null, "Auto increment column name must not be null.");
389+
autoIncrementColumnNames.add(columnName);
390+
return this;
391+
}
392+
363393
/** Returns an instance of an {@link Schema}. */
364394
public Schema build() {
365395
Integer maximumColumnId =
@@ -372,7 +402,7 @@ public Schema build() {
372402
checkState(
373403
columns.stream().map(Column::getColumnId).distinct().count() == columns.size(),
374404
"Column ids must be unique.");
375-
return new Schema(columns, primaryKey, highestFieldId.get());
405+
return new Schema(columns, primaryKey, highestFieldId.get(), autoIncrementColumnNames);
376406
}
377407
}
378408

@@ -522,7 +552,9 @@ public int hashCode() {
522552

523553
/** Normalize columns and primary key. */
524554
private static List<Column> normalizeColumns(
525-
List<Column> columns, @Nullable PrimaryKey primaryKey) {
555+
List<Column> columns,
556+
@Nullable PrimaryKey primaryKey,
557+
List<String> autoIncrementColumnNames) {
526558

527559
List<String> columnNames =
528560
columns.stream().map(Column::getName).collect(Collectors.toList());
@@ -536,6 +568,9 @@ private static List<Column> normalizeColumns(
536568
Set<String> allFields = new HashSet<>(columnNames);
537569

538570
if (primaryKey == null) {
571+
checkState(
572+
autoIncrementColumnNames.isEmpty(),
573+
"Auto increment column can only be used in primary-key table.");
539574
return Collections.unmodifiableList(columns);
540575
}
541576

@@ -552,10 +587,27 @@ private static List<Column> normalizeColumns(
552587
columnNames,
553588
primaryKeyNames);
554589

555-
// primary key should not nullable
556590
Set<String> pkSet = new HashSet<>(primaryKeyNames);
591+
for (String autoIncrementColumn : autoIncrementColumnNames) {
592+
checkState(
593+
allFields.contains(autoIncrementColumn),
594+
"Auto increment column %s does not exist in table columns %s.",
595+
autoIncrementColumn,
596+
columnNames);
597+
checkState(
598+
!pkSet.contains(autoIncrementColumn),
599+
"Auto increment column can not be used as the primary key.");
600+
}
557601
List<Column> newColumns = new ArrayList<>();
558602
for (Column column : columns) {
603+
if (autoIncrementColumnNames.contains(column.getName())) {
604+
checkState(
605+
column.getDataType().is(DataTypeRoot.INTEGER)
606+
|| column.getDataType().is(DataTypeRoot.BIGINT),
607+
"The data type of auto increment column must be INT or BIGINT.");
608+
}
609+
610+
// primary key should not nullable
559611
if (pkSet.contains(column.getName()) && column.getDataType().isNullable()) {
560612
newColumns.add(
561613
new Column(

fluss-common/src/main/java/org/apache/fluss/utils/json/SchemaJsonSerde.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class SchemaJsonSerde implements JsonSerializer<Schema>, JsonDeserializer
3636

3737
private static final String COLUMNS_NAME = "columns";
3838
private static final String PRIMARY_KEY_NAME = "primary_key";
39+
private static final String AUTO_INCREMENT_COLUMN_NAME = "auto_increment_column";
3940
private static final String VERSION_KEY = "version";
4041
private static final String HIGHEST_FIELD_ID = "highest_field_id";
4142
private static final int VERSION = 1;
@@ -62,6 +63,14 @@ public void serialize(Schema schema, JsonGenerator generator) throws IOException
6263
}
6364
generator.writeEndArray();
6465
}
66+
List<String> autoIncrementColumnNames = schema.getAutoIncrementColumnNames();
67+
if (!autoIncrementColumnNames.isEmpty()) {
68+
generator.writeArrayFieldStart(AUTO_INCREMENT_COLUMN_NAME);
69+
for (String columnName : autoIncrementColumnNames) {
70+
generator.writeString(columnName);
71+
}
72+
generator.writeEndArray();
73+
}
6574

6675
generator.writeNumberField(HIGHEST_FIELD_ID, schema.getHighestFieldId());
6776

@@ -86,6 +95,14 @@ public Schema deserialize(JsonNode node) {
8695
builder.primaryKey(primaryKeys);
8796
}
8897

98+
if (node.has(AUTO_INCREMENT_COLUMN_NAME)) {
99+
Iterator<JsonNode> autoIncrementColumnJsons =
100+
node.get(AUTO_INCREMENT_COLUMN_NAME).elements();
101+
while (autoIncrementColumnJsons.hasNext()) {
102+
builder.enableAutoIncrement(autoIncrementColumnJsons.next().asText());
103+
}
104+
}
105+
89106
if (node.has(HIGHEST_FIELD_ID)) {
90107
builder.highestFieldId(node.get(HIGHEST_FIELD_ID).asInt());
91108
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.metadata;
19+
20+
import org.apache.fluss.types.DataTypes;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import java.util.Collections;
25+
26+
import static org.assertj.core.api.Assertions.assertThat;
27+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
28+
29+
/** Tests for {@link org.apache.fluss.metadata.Schema}. */
30+
class TableSchemaTest {
31+
32+
@Test
33+
void testAutoIncrementColumnSchema() {
34+
assertThatThrownBy(
35+
() ->
36+
Schema.newBuilder()
37+
.column("f0", DataTypes.STRING())
38+
.column("f1", DataTypes.BIGINT())
39+
.column("f3", DataTypes.STRING())
40+
.primaryKey("f0")
41+
.primaryKey("f0")
42+
.build())
43+
.isInstanceOf(IllegalStateException.class)
44+
.hasMessage("Multiple primary keys are not supported.");
45+
46+
assertThat(
47+
Schema.newBuilder()
48+
.column("f0", DataTypes.STRING())
49+
.column("f1", DataTypes.BIGINT())
50+
.column("f3", DataTypes.STRING())
51+
.enableAutoIncrement("f1")
52+
.primaryKey("f0")
53+
.build()
54+
.getAutoIncrementColumnNames())
55+
.isEqualTo(Collections.singletonList("f1"));
56+
assertThat(
57+
Schema.newBuilder()
58+
.column("f0", DataTypes.STRING())
59+
.column("f1", DataTypes.BIGINT())
60+
.column("f3", DataTypes.STRING())
61+
.primaryKey("f0")
62+
.build()
63+
.getAutoIncrementColumnNames())
64+
.isEmpty();
65+
66+
assertThatThrownBy(
67+
() ->
68+
Schema.newBuilder()
69+
.column("f0", DataTypes.STRING())
70+
.column("f1", DataTypes.BIGINT())
71+
.column("f3", DataTypes.STRING())
72+
.enableAutoIncrement("f0")
73+
.primaryKey("f0")
74+
.build())
75+
.isInstanceOf(IllegalStateException.class)
76+
.hasMessage("Auto increment column can not be used as the primary key.");
77+
78+
assertThatThrownBy(
79+
() ->
80+
Schema.newBuilder()
81+
.column("f0", DataTypes.STRING())
82+
.column("f1", DataTypes.BIGINT())
83+
.column("f3", DataTypes.STRING())
84+
.enableAutoIncrement("f1")
85+
.enableAutoIncrement("f1")
86+
.primaryKey("f0")
87+
.build())
88+
.isInstanceOf(IllegalStateException.class)
89+
.hasMessage("Multiple auto increment columns are not supported yet.");
90+
assertThatThrownBy(
91+
() ->
92+
Schema.newBuilder()
93+
.column("f0", DataTypes.STRING())
94+
.column("f1", DataTypes.BIGINT())
95+
.column("f3", DataTypes.STRING())
96+
.enableAutoIncrement("f3")
97+
.primaryKey("f0")
98+
.build())
99+
.isInstanceOf(IllegalStateException.class)
100+
.hasMessage("The data type of auto increment column must be INT or BIGINT.");
101+
assertThatThrownBy(
102+
() ->
103+
Schema.newBuilder()
104+
.column("f0", DataTypes.STRING())
105+
.column("f1", DataTypes.BIGINT())
106+
.column("f3", DataTypes.STRING())
107+
.enableAutoIncrement("f4")
108+
.primaryKey("f0")
109+
.build())
110+
.isInstanceOf(IllegalStateException.class)
111+
.hasMessage(
112+
"Auto increment column f4 does not exist in table columns [f0, f1, f3].");
113+
assertThatThrownBy(
114+
() ->
115+
Schema.newBuilder()
116+
.column("f0", DataTypes.STRING())
117+
.column("f1", DataTypes.BIGINT())
118+
.column("f3", DataTypes.STRING())
119+
.enableAutoIncrement("f1")
120+
.build())
121+
.isInstanceOf(IllegalStateException.class)
122+
.hasMessage("Auto increment column can only be used in primary-key table.");
123+
}
124+
}

fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,18 @@ public class SchemaJsonSerdeTest extends JsonSerdeTestBase<Schema> {
6666
.withComment("c is third column")
6767
.build();
6868

69+
static final Schema SCHEMA_4 =
70+
Schema.newBuilder()
71+
.column("a", DataTypes.INT())
72+
.withComment("a is first column")
73+
.column("b", DataTypes.INT())
74+
.withComment("b is second column")
75+
.column("c", DataTypes.CHAR(10))
76+
.withComment("c is third column")
77+
.primaryKey("a", "c")
78+
.enableAutoIncrement("b")
79+
.build();
80+
6981
static final String SCHEMA_JSON_0 =
7082
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"highest_field_id\":2}";
7183
static final String SCHEMA_JSON_1 =
@@ -74,18 +86,23 @@ public class SchemaJsonSerdeTest extends JsonSerdeTestBase<Schema> {
7486
static final String SCHEMA_JSON_3 =
7587
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c is third column\",\"id\":2}],\"highest_field_id\":2}";
7688

89+
static final String SCHEMA_JSON_4 =
90+
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
91+
7792
SchemaJsonSerdeTest() {
7893
super(SchemaJsonSerde.INSTANCE);
7994
}
8095

8196
@Override
8297
protected Schema[] createObjects() {
83-
return new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3};
98+
return new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3, SCHEMA_4};
8499
}
85100

86101
@Override
87102
protected String[] expectedJsons() {
88-
return new String[] {SCHEMA_JSON_0, SCHEMA_JSON_1, SCHEMA_JSON_1, SCHEMA_JSON_3};
103+
return new String[] {
104+
SCHEMA_JSON_0, SCHEMA_JSON_1, SCHEMA_JSON_1, SCHEMA_JSON_3, SCHEMA_JSON_4
105+
};
89106
}
90107

91108
@Test

0 commit comments

Comments
 (0)