Skip to content

Commit b8f6590

Browse files
committed
[server] Tolerate lake table existent if the schema and properties matches
1 parent 61475a8 commit b8f6590

File tree

18 files changed

+973
-55
lines changed

18 files changed

+973
-55
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
package org.apache.fluss.lake.lakestorage;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.exception.InvalidConfigException;
2122
import org.apache.fluss.exception.TableAlreadyExistException;
2223
import org.apache.fluss.exception.TableNotExistException;
2324
import org.apache.fluss.metadata.TableChange;
2425
import org.apache.fluss.metadata.TableDescriptor;
2526
import org.apache.fluss.metadata.TablePath;
2627

2728
import java.util.List;
29+
import java.util.Map;
2830

2931
/**
3032
* A catalog interface to modify metadata in external datalake.
@@ -34,6 +36,14 @@
3436
@PublicEvolving
3537
public interface LakeCatalog extends AutoCloseable {
3638

39+
/**
40+
* Get a table in lake.
41+
*
42+
* @param tablePath path of the table to be created
43+
* @throws TableNotExistException if the table not exists
44+
*/
45+
TableDescriptor getTable(TablePath tablePath) throws TableNotExistException;
46+
3747
/**
3848
* Create a new table in lake.
3949
*
@@ -54,6 +64,26 @@ void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
5464
void alterTable(TablePath tablePath, List<TableChange> tableChanges)
5565
throws TableNotExistException;
5666

67+
/**
68+
* Check whether the properties of the existing lake table are compatible with the properties of
69+
* the table want to be created.
70+
*
71+
* @param properties The properties to be validated
72+
*/
73+
default void validateTablePropertyCompatibility(
74+
Map<String, String> existingProperties, Map<String, String> properties) {
75+
if (!existingProperties.equals(properties)) {
76+
throw new InvalidConfigException(
77+
"The properties of the existing lake table are not compatible with "
78+
+ "the properties of the table want to be created. "
79+
+ "Properties of existing lake table: "
80+
+ existingProperties
81+
+ ", "
82+
+ "properties of lake table to be created: "
83+
+ properties);
84+
}
85+
}
86+
5787
@Override
5888
default void close() throws Exception {
5989
// default do nothing

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.fluss.utils.WrappingProxy;
3030

3131
import java.util.List;
32+
import java.util.Map;
3233

3334
/**
3435
* A wrapper around {@link LakeStoragePlugin} that ensures the plugin classloader is used for all
@@ -74,6 +75,13 @@ private ClassLoaderFixingLakeCatalog(final LakeCatalog inner, final ClassLoader
7475
this.loader = loader;
7576
}
7677

78+
@Override
79+
public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException {
80+
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
81+
return inner.getTable(tablePath);
82+
}
83+
}
84+
7785
@Override
7886
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
7987
throws TableAlreadyExistException {
@@ -90,6 +98,14 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
9098
}
9199
}
92100

101+
@Override
102+
public void validateTablePropertyCompatibility(
103+
Map<String, String> existingProperties, Map<String, String> properties) {
104+
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
105+
inner.validateTablePropertyCompatibility(existingProperties, properties);
106+
}
107+
}
108+
93109
@Override
94110
public void close() throws Exception {
95111
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {

fluss-common/src/main/java/org/apache/fluss/types/DataType.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ public String asSummaryString() {
139139

140140
public abstract <R> R accept(DataTypeVisitor<R> visitor);
141141

142+
public DataType notNull() {
143+
return copy(false);
144+
}
145+
146+
public DataType nullable() {
147+
return copy(true);
148+
}
149+
142150
@Override
143151
public String toString() {
144152
return asSummaryString();

fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ public LakeSource<?> createLakeSource(TablePath tablePath) {
145145

146146
private static class TestPaimonLakeCatalog implements LakeCatalog {
147147

148+
@Override
149+
public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException {
150+
return null;
151+
}
152+
148153
@Override
149154
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
150155
throws TableAlreadyExistException {}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -308,13 +308,8 @@ void testCreateAlreadyExistsLakeTable() throws Exception {
308308
assertThat(catalog.tableExists(lakeTablePath)).isTrue();
309309
// drop fluss table
310310
catalog.dropTable(lakeTablePath, false);
311-
// create the table again, should throw exception with ignore if exist = false
312-
assertThatThrownBy(() -> catalog.createTable(lakeTablePath, table, false))
313-
.isInstanceOf(CatalogException.class)
314-
.hasMessage(
315-
String.format(
316-
"The table %s already exists in %s catalog, please first drop the table in %s catalog or use a new table name.",
317-
lakeTablePath, "paimon", "paimon"));
311+
// create the table again, should be ok if lake table schema match the fluss table schema
312+
catalog.createTable(lakeTablePath, table, false);
318313
}
319314

320315
@Test

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ protected Catalog getIcebergCatalog() {
8383
return icebergCatalog;
8484
}
8585

86+
@Override
87+
public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException {
88+
throw new UnsupportedOperationException(
89+
"Get table is not supported for Iceberg at the moment");
90+
}
91+
8692
@Override
8793
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
8894
throws TableAlreadyExistException {

fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ public LanceLakeCatalog(Configuration config) {
4242
this.options = config;
4343
}
4444

45+
@Override
46+
public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException {
47+
throw new UnsupportedOperationException(
48+
"Get table is not supported for Lance at the moment");
49+
}
50+
4551
@Override
4652
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
4753
// currently, we don't support primary key table for lance
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.paimon;
19+
20+
import org.apache.fluss.types.DataType;
21+
import org.apache.fluss.types.DataTypes;
22+
23+
import org.apache.paimon.types.ArrayType;
24+
import org.apache.paimon.types.BigIntType;
25+
import org.apache.paimon.types.BinaryType;
26+
import org.apache.paimon.types.BooleanType;
27+
import org.apache.paimon.types.CharType;
28+
import org.apache.paimon.types.DataField;
29+
import org.apache.paimon.types.DataTypeVisitor;
30+
import org.apache.paimon.types.DateType;
31+
import org.apache.paimon.types.DecimalType;
32+
import org.apache.paimon.types.DoubleType;
33+
import org.apache.paimon.types.FloatType;
34+
import org.apache.paimon.types.IntType;
35+
import org.apache.paimon.types.LocalZonedTimestampType;
36+
import org.apache.paimon.types.MapType;
37+
import org.apache.paimon.types.MultisetType;
38+
import org.apache.paimon.types.RowType;
39+
import org.apache.paimon.types.SmallIntType;
40+
import org.apache.paimon.types.TimeType;
41+
import org.apache.paimon.types.TimestampType;
42+
import org.apache.paimon.types.TinyIntType;
43+
import org.apache.paimon.types.VarBinaryType;
44+
import org.apache.paimon.types.VarCharType;
45+
import org.apache.paimon.types.VariantType;
46+
47+
/** Convert from Paimon's data type to Fluss's data type. */
48+
public class PaimonDataTypeToFlussDataType implements DataTypeVisitor<DataType> {
49+
50+
public static final PaimonDataTypeToFlussDataType INSTANCE =
51+
new PaimonDataTypeToFlussDataType();
52+
53+
@Override
54+
public DataType visit(CharType charType) {
55+
return withNullability(DataTypes.CHAR(charType.getLength()), charType.isNullable());
56+
}
57+
58+
@Override
59+
public DataType visit(VarCharType varCharType) {
60+
return withNullability(DataTypes.STRING(), varCharType.isNullable());
61+
}
62+
63+
@Override
64+
public DataType visit(BooleanType booleanType) {
65+
return withNullability(DataTypes.BOOLEAN(), booleanType.isNullable());
66+
}
67+
68+
@Override
69+
public DataType visit(BinaryType binaryType) {
70+
return withNullability(DataTypes.BINARY(binaryType.getLength()), binaryType.isNullable());
71+
}
72+
73+
@Override
74+
public DataType visit(VarBinaryType varBinaryType) {
75+
return withNullability(DataTypes.BYTES(), varBinaryType.isNullable());
76+
}
77+
78+
@Override
79+
public DataType visit(DecimalType decimalType) {
80+
return withNullability(
81+
DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()),
82+
decimalType.isNullable());
83+
}
84+
85+
@Override
86+
public DataType visit(TinyIntType tinyIntType) {
87+
return withNullability(DataTypes.TINYINT(), tinyIntType.isNullable());
88+
}
89+
90+
@Override
91+
public DataType visit(SmallIntType smallIntType) {
92+
return withNullability(DataTypes.SMALLINT(), smallIntType.isNullable());
93+
}
94+
95+
@Override
96+
public DataType visit(IntType intType) {
97+
return withNullability(DataTypes.INT(), intType.isNullable());
98+
}
99+
100+
@Override
101+
public DataType visit(BigIntType bigIntType) {
102+
return withNullability(DataTypes.BIGINT(), bigIntType.isNullable());
103+
}
104+
105+
@Override
106+
public DataType visit(FloatType floatType) {
107+
return withNullability(DataTypes.FLOAT(), floatType.isNullable());
108+
}
109+
110+
@Override
111+
public DataType visit(DoubleType doubleType) {
112+
return withNullability(DataTypes.DOUBLE(), doubleType.isNullable());
113+
}
114+
115+
@Override
116+
public DataType visit(DateType dateType) {
117+
return withNullability(DataTypes.DATE(), dateType.isNullable());
118+
}
119+
120+
@Override
121+
public DataType visit(TimeType timeType) {
122+
return withNullability(DataTypes.TIME(), timeType.isNullable());
123+
}
124+
125+
@Override
126+
public DataType visit(TimestampType timestampType) {
127+
return withNullability(
128+
DataTypes.TIMESTAMP(timestampType.getPrecision()), timestampType.isNullable());
129+
}
130+
131+
@Override
132+
public DataType visit(LocalZonedTimestampType localZonedTimestampType) {
133+
return withNullability(
134+
DataTypes.TIMESTAMP_LTZ(localZonedTimestampType.getPrecision()),
135+
localZonedTimestampType.isNullable());
136+
}
137+
138+
@Override
139+
public DataType visit(VariantType variantType) {
140+
throw new UnsupportedOperationException("VariantType is not supported for Fluss.");
141+
}
142+
143+
@Override
144+
public DataType visit(ArrayType arrayType) {
145+
return withNullability(
146+
DataTypes.ARRAY(arrayType.getElementType().accept(this)), arrayType.isNullable());
147+
}
148+
149+
@Override
150+
public DataType visit(MultisetType multisetType) {
151+
throw new UnsupportedOperationException("MultisetType is not supported for Fluss.");
152+
}
153+
154+
@Override
155+
public DataType visit(MapType mapType) {
156+
return withNullability(
157+
DataTypes.MAP(
158+
mapType.getKeyType().accept(this), mapType.getValueType().accept(this)),
159+
mapType.isNullable());
160+
}
161+
162+
@Override
163+
public DataType visit(RowType rowType) {
164+
org.apache.fluss.types.RowType.Builder rowTypeBuilder =
165+
org.apache.fluss.types.RowType.builder();
166+
for (DataField field : rowType.getFields()) {
167+
rowTypeBuilder.field(field.name(), field.type().accept(this), field.description());
168+
}
169+
return withNullability(rowTypeBuilder.build(), rowType.isNullable());
170+
}
171+
172+
private DataType withNullability(DataType fluss, boolean nullable) {
173+
if (fluss.isNullable() != nullable) {
174+
return nullable ? fluss.nullable() : fluss.notNull();
175+
}
176+
return fluss;
177+
}
178+
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,19 @@
3434
import org.apache.paimon.options.Options;
3535
import org.apache.paimon.schema.Schema;
3636
import org.apache.paimon.schema.SchemaChange;
37+
import org.apache.paimon.table.Table;
3738
import org.apache.paimon.types.DataType;
3839
import org.apache.paimon.types.DataTypes;
3940

4041
import java.util.LinkedHashMap;
4142
import java.util.List;
43+
import java.util.Map;
4244

45+
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toFlussTableDescriptor;
4346
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
4447
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
4548
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
49+
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.validatePaimonTableOptions;
4650
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
4751
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
4852
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -76,6 +80,17 @@ protected Catalog getPaimonCatalog() {
7680
return paimonCatalog;
7781
}
7882

83+
@Override
84+
public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException {
85+
try {
86+
Identifier paimonPath = toPaimon(tablePath);
87+
Table table = paimonCatalog.getTable(paimonPath);
88+
return toFlussTableDescriptor(table);
89+
} catch (Catalog.TableNotExistException e) {
90+
throw new TableNotExistException("Table " + tablePath + " not exists.");
91+
}
92+
}
93+
7994
@Override
8095
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
8196
throws TableAlreadyExistException {
@@ -114,6 +129,12 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
114129
}
115130
}
116131

132+
@Override
133+
public void validateTablePropertyCompatibility(
134+
Map<String, String> existingProperties, Map<String, String> properties) {
135+
validatePaimonTableOptions(existingProperties, properties);
136+
}
137+
117138
private void createTable(Identifier tablePath, Schema schema)
118139
throws Catalog.DatabaseNotExistException {
119140
try {

0 commit comments

Comments
 (0)