Skip to content

Commit 0de7691

Browse files
committed
move auto incr columns to schema level
1 parent 91653e6 commit 0de7691

File tree

6 files changed

+73
-62
lines changed

6 files changed

+73
-62
lines changed

fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java

Lines changed: 27 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,16 @@ public final class Schema implements Serializable {
6161

6262
private final List<Column> columns;
6363
private final @Nullable PrimaryKey primaryKey;
64+
private final List<String> autoIncrementColumnNames;
6465
private final RowType rowType;
6566

6667
private Schema(
6768
List<Column> columns,
6869
@Nullable PrimaryKey primaryKey,
69-
@Nullable String autoIncrementColumn) {
70-
this.columns = normalizeColumns(columns, primaryKey, autoIncrementColumn);
70+
List<String> autoIncrementColumnNames) {
71+
this.columns = normalizeColumns(columns, primaryKey, autoIncrementColumnNames);
7172
this.primaryKey = primaryKey;
73+
this.autoIncrementColumnNames = autoIncrementColumnNames;
7274
// pre-create the row type as it is the most frequently used part of the schema
7375
this.rowType =
7476
new RowType(
@@ -88,6 +90,10 @@ public Optional<PrimaryKey> getPrimaryKey() {
8890
return Optional.ofNullable(primaryKey);
8991
}
9092

93+
public List<String> getAutoIncrementColumnNames() {
94+
return autoIncrementColumnNames;
95+
}
96+
9197
public RowType getRowType() {
9298
return rowType;
9399
}
@@ -192,10 +198,11 @@ public static Schema.Builder newBuilder() {
192198
public static final class Builder {
193199
private final List<Column> columns;
194200
private @Nullable PrimaryKey primaryKey;
195-
private @Nullable String autoIncrementColumn;
201+
private final List<String> autoIncrementColumnNames;
196202

197203
private Builder() {
198204
columns = new ArrayList<>();
205+
autoIncrementColumnNames = new ArrayList<>();
199206
}
200207

201208
/** Adopts all members from the given schema. */
@@ -329,16 +336,16 @@ public Builder primaryKeyNamed(String constraintName, List<String> columnNames)
329336
*/
330337
public Builder enableAutoIncrement(String columnName) {
331338
checkState(
332-
autoIncrementColumn == null,
333-
"Multiple auto increment columns are not supported.");
339+
autoIncrementColumnNames.isEmpty(),
340+
"Multiple auto increment columns are not supported yet.");
334341
checkArgument(columnName != null, "Auto increment column name must not be null.");
335-
autoIncrementColumn = columnName;
342+
autoIncrementColumnNames.add(columnName);
336343
return this;
337344
}
338345

339346
/** Returns an instance of an {@link Schema}. */
340347
public Schema build() {
341-
return new Schema(columns, primaryKey, autoIncrementColumn);
348+
return new Schema(columns, primaryKey, autoIncrementColumnNames);
342349
}
343350
}
344351

@@ -357,39 +364,21 @@ public static final class Column implements Serializable {
357364
private final String columnName;
358365
private final DataType dataType;
359366
private final @Nullable String comment;
360-
private final Boolean autoIncrement;
361367

362368
public Column(String columnName, DataType dataType) {
363369
this(columnName, dataType, null);
364370
}
365371

366-
public Column(String columnName, DataType dataType, boolean autoIncrement) {
367-
this(columnName, dataType, null, autoIncrement);
368-
}
369-
370372
public Column(String columnName, DataType dataType, @Nullable String comment) {
371-
this(columnName, dataType, comment, false);
372-
}
373-
374-
public Column(
375-
String columnName,
376-
DataType dataType,
377-
@Nullable String comment,
378-
Boolean autoIncrement) {
379373
this.columnName = columnName;
380374
this.dataType = dataType;
381375
this.comment = comment;
382-
this.autoIncrement = autoIncrement;
383376
}
384377

385378
public String getName() {
386379
return columnName;
387380
}
388381

389-
public Boolean getAutoIncrement() {
390-
return autoIncrement;
391-
}
392-
393382
public Optional<String> getComment() {
394383
return Optional.ofNullable(comment);
395384
}
@@ -399,16 +388,13 @@ public DataType getDataType() {
399388
}
400389

401390
public Column withComment(String comment) {
402-
return new Column(columnName, dataType, comment, autoIncrement);
391+
return new Column(columnName, dataType, comment);
403392
}
404393

405394
@Override
406395
public String toString() {
407396
final StringBuilder sb = new StringBuilder();
408397
sb.append(columnName).append(" ").append(dataType.toString());
409-
if (autoIncrement) {
410-
sb.append(" AUTO INCREMENT ");
411-
}
412398
getComment()
413399
.ifPresent(
414400
c -> {
@@ -430,13 +416,12 @@ public boolean equals(Object o) {
430416
Column that = (Column) o;
431417
return Objects.equals(columnName, that.columnName)
432418
&& Objects.equals(dataType, that.dataType)
433-
&& Objects.equals(comment, that.comment)
434-
&& Objects.equals(autoIncrement, that.autoIncrement);
419+
&& Objects.equals(comment, that.comment);
435420
}
436421

437422
@Override
438423
public int hashCode() {
439-
return Objects.hash(columnName, dataType, comment, autoIncrement);
424+
return Objects.hash(columnName, dataType, comment);
440425
}
441426
}
442427

@@ -499,7 +484,7 @@ public int hashCode() {
499484
private static List<Column> normalizeColumns(
500485
List<Column> columns,
501486
@Nullable PrimaryKey primaryKey,
502-
@Nullable String autoIncrementColumn) {
487+
List<String> autoIncrementColumnNames) {
503488

504489
List<String> columnNames =
505490
columns.stream().map(Column::getName).collect(Collectors.toList());
@@ -514,7 +499,7 @@ private static List<Column> normalizeColumns(
514499

515500
if (primaryKey == null) {
516501
checkState(
517-
autoIncrementColumn == null,
502+
autoIncrementColumnNames.isEmpty(),
518503
"Auto increment column can only be used in primary-key table.");
519504
return Collections.unmodifiableList(columns);
520505
}
@@ -533,7 +518,7 @@ private static List<Column> normalizeColumns(
533518
primaryKeyNames);
534519

535520
Set<String> pkSet = new HashSet<>(primaryKeyNames);
536-
if (autoIncrementColumn != null) {
521+
for (String autoIncrementColumn : autoIncrementColumnNames) {
537522
checkState(
538523
allFields.contains(autoIncrementColumn),
539524
"Auto increment column %s does not exist in table columns %s.",
@@ -546,6 +531,13 @@ private static List<Column> normalizeColumns(
546531
// primary key should not nullable
547532
List<Column> newColumns = new ArrayList<>();
548533
for (Column column : columns) {
534+
if (autoIncrementColumnNames.contains(column.getName())) {
535+
checkState(
536+
column.getDataType().is(DataTypeRoot.INTEGER)
537+
|| column.getDataType().is(DataTypeRoot.BIGINT),
538+
"The data type of auto increment column must be INT or BIGINT.");
539+
}
540+
549541
if (pkSet.contains(column.getName()) && column.getDataType().isNullable()) {
550542
newColumns.add(
551543
new Column(
@@ -554,17 +546,6 @@ private static List<Column> normalizeColumns(
554546
column.getComment().isPresent()
555547
? column.getComment().get()
556548
: null));
557-
} else if (Objects.equals(column.getName(), autoIncrementColumn)) {
558-
checkState(
559-
column.getDataType().is(DataTypeRoot.INTEGER)
560-
|| column.getDataType().is(DataTypeRoot.BIGINT),
561-
"The data type of auto increment column must be INT or BIGINT.");
562-
newColumns.add(
563-
new Column(
564-
column.getName(),
565-
column.getDataType().copy(),
566-
column.getComment().isPresent() ? column.getComment().get() : null,
567-
true));
568549
} else {
569550
newColumns.add(column);
570551
}

fluss-common/src/main/java/org/apache/fluss/utils/json/ColumnJsonSerde.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ public class ColumnJsonSerde
3434
static final String NAME = "name";
3535
static final String DATA_TYPE = "data_type";
3636
static final String COMMENT = "comment";
37-
static final String AUTO_INCREMENT = "auto_increment";
3837

3938
@Override
4039
public void serialize(Schema.Column column, JsonGenerator generator) throws IOException {
@@ -45,9 +44,6 @@ public void serialize(Schema.Column column, JsonGenerator generator) throws IOEx
4544
generator.writeFieldName(DATA_TYPE);
4645
DataTypeJsonSerde.INSTANCE.serialize(column.getDataType(), generator);
4746

48-
if (column.getAutoIncrement()) {
49-
generator.writeBooleanField(AUTO_INCREMENT, true);
50-
}
5147
if (column.getComment().isPresent()) {
5248
generator.writeStringField(COMMENT, column.getComment().get());
5349
}
@@ -60,12 +56,8 @@ public Schema.Column deserialize(JsonNode node) {
6056
String columnName = node.required(NAME).asText();
6157

6258
DataType dataType = DataTypeJsonSerde.INSTANCE.deserialize(node.get(DATA_TYPE));
63-
Schema.Column column;
64-
if (node.has(AUTO_INCREMENT)) {
65-
column = new Schema.Column(columnName, dataType, node.get(AUTO_INCREMENT).asBoolean());
66-
} else {
67-
column = new Schema.Column(columnName, dataType);
68-
}
59+
Schema.Column column = new Schema.Column(columnName, dataType);
60+
6961
if (node.hasNonNull(COMMENT)) {
7062
column = column.withComment(node.get(COMMENT).asText());
7163
}

fluss-common/src/main/java/org/apache/fluss/utils/json/SchemaJsonSerde.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class SchemaJsonSerde implements JsonSerializer<Schema>, JsonDeserializer
3636

3737
private static final String COLUMNS_NAME = "columns";
3838
private static final String PRIMARY_KEY_NAME = "primary_key";
39+
private static final String AUTO_INCREMENT_COLUMN_NAME = "auto_increment_column";
3940
private static final String VERSION_KEY = "version";
4041
private static final int VERSION = 1;
4142

@@ -61,6 +62,14 @@ public void serialize(Schema schema, JsonGenerator generator) throws IOException
6162
}
6263
generator.writeEndArray();
6364
}
65+
List<String> autoIncrementColumnNames = schema.getAutoIncrementColumnNames();
66+
if (!autoIncrementColumnNames.isEmpty()) {
67+
generator.writeArrayFieldStart(AUTO_INCREMENT_COLUMN_NAME);
68+
for (String columnName : autoIncrementColumnNames) {
69+
generator.writeString(columnName);
70+
}
71+
generator.writeEndArray();
72+
}
6473

6574
generator.writeEndObject();
6675
}
@@ -83,6 +92,14 @@ public Schema deserialize(JsonNode node) {
8392
builder.primaryKey(primaryKeys);
8493
}
8594

95+
if (node.has(AUTO_INCREMENT_COLUMN_NAME)) {
96+
Iterator<JsonNode> autoIncrementColumnJsons =
97+
node.get(AUTO_INCREMENT_COLUMN_NAME).elements();
98+
while (autoIncrementColumnJsons.hasNext()) {
99+
builder.enableAutoIncrement(autoIncrementColumnJsons.next().asText());
100+
}
101+
}
102+
86103
return builder.build();
87104
}
88105
}

fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222
import org.junit.jupiter.api.Test;
2323

24+
import java.util.Collections;
25+
26+
import static org.assertj.core.api.Assertions.assertThat;
2427
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2528

2629
/** Tests for {@link org.apache.fluss.metadata.Schema}. */
@@ -40,6 +43,26 @@ void testAutoIncrementColumnSchema() {
4043
.isInstanceOf(IllegalStateException.class)
4144
.hasMessage("Multiple primary keys are not supported.");
4245

46+
assertThat(
47+
Schema.newBuilder()
48+
.column("f0", DataTypes.STRING())
49+
.column("f1", DataTypes.BIGINT())
50+
.column("f3", DataTypes.STRING())
51+
.enableAutoIncrement("f1")
52+
.primaryKey("f0")
53+
.build()
54+
.getAutoIncrementColumnNames())
55+
.isEqualTo(Collections.singletonList("f1"));
56+
assertThat(
57+
Schema.newBuilder()
58+
.column("f0", DataTypes.STRING())
59+
.column("f1", DataTypes.BIGINT())
60+
.column("f3", DataTypes.STRING())
61+
.primaryKey("f0")
62+
.build()
63+
.getAutoIncrementColumnNames())
64+
.isEmpty();
65+
4366
assertThatThrownBy(
4467
() ->
4568
Schema.newBuilder()
@@ -63,7 +86,7 @@ void testAutoIncrementColumnSchema() {
6386
.primaryKey("f0")
6487
.build())
6588
.isInstanceOf(IllegalStateException.class)
66-
.hasMessage("Multiple auto increment columns are not supported.");
89+
.hasMessage("Multiple auto increment columns are not supported yet.");
6790
assertThatThrownBy(
6891
() ->
6992
Schema.newBuilder()

fluss-common/src/test/java/org/apache/fluss/utils/json/ColumnJsonSerdeTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,10 @@ protected ColumnJsonSerdeTest() {
2929

3030
@Override
3131
protected Schema.Column[] createObjects() {
32-
Schema.Column[] columns = new Schema.Column[4];
32+
Schema.Column[] columns = new Schema.Column[3];
3333
columns[0] = new Schema.Column("a", DataTypes.STRING());
3434
columns[1] = new Schema.Column("b", DataTypes.INT(), "hello b");
3535
columns[2] = new Schema.Column("c", new IntType(false), "hello c");
36-
columns[3] = new Schema.Column("d", new IntType(false), "hello d", true);
3736
return columns;
3837
}
3938

@@ -42,8 +41,7 @@ protected String[] expectedJsons() {
4241
return new String[] {
4342
"{\"name\":\"a\",\"data_type\":{\"type\":\"STRING\"}}",
4443
"{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"hello b\"}",
45-
"{\"name\":\"c\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"hello c\"}",
46-
"{\"name\":\"d\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"auto_increment\":true,\"comment\":\"hello d\"}"
44+
"{\"name\":\"c\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"hello c\"}"
4745
};
4846
}
4947
}

fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public class SchemaJsonSerdeTest extends JsonSerdeTestBase<Schema> {
8484
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a is first column\"},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b is second column\"},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c is third column\"}]}";
8585

8686
static final String SCHEMA_JSON_4 =
87-
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\"},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"auto_increment\":true,\"comment\":\"b is second column\"},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\"}],\"primary_key\":[\"a\",\"c\"]}";
87+
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\"},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b is second column\"},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\"}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"]}";
8888

8989
SchemaJsonSerdeTest() {
9090
super(SchemaJsonSerde.INSTANCE);

0 commit comments

Comments
 (0)