Skip to content

Commit c79c62f

Browse files
committed
[server] Tolerate Paimon lake table existent if the schema and properties matches
1 parent 6003c42 commit c79c62f

File tree

12 files changed

+438
-62
lines changed

12 files changed

+438
-62
lines changed

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -308,13 +308,9 @@ void testCreateAlreadyExistsLakeTable() throws Exception {
308308
assertThat(catalog.tableExists(lakeTablePath)).isTrue();
309309
// drop fluss table
310310
catalog.dropTable(lakeTablePath, false);
311-
// create the table again, should throw exception with ignore if exist = false
312-
assertThatThrownBy(() -> catalog.createTable(lakeTablePath, table, false))
313-
.isInstanceOf(CatalogException.class)
314-
.hasMessage(
315-
String.format(
316-
"The table %s already exists in %s catalog, please first drop the table in %s catalog or use a new table name.",
317-
lakeTablePath, "paimon", "paimon"));
311+
assertThat(catalog.tableExists(lakeTablePath)).isFalse();
312+
// create the table again
313+
catalog.createTable(lakeTablePath, table, false);
318314
}
319315

320316
@Test

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.paimon.options.Options;
3535
import org.apache.paimon.schema.Schema;
3636
import org.apache.paimon.schema.SchemaChange;
37+
import org.apache.paimon.table.FileStoreTable;
38+
import org.apache.paimon.table.Table;
3739
import org.apache.paimon.types.DataType;
3840
import org.apache.paimon.types.DataTypes;
3941

@@ -43,6 +45,7 @@
4345
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
4446
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
4547
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
48+
import static org.apache.fluss.lake.paimon.utils.PaimonSchemaValidation.validatePaimonSchemaCapability;
4649
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
4750
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
4851
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -120,7 +123,13 @@ private void createTable(Identifier tablePath, Schema schema)
120123
// not ignore if table exists
121124
paimonCatalog.createTable(tablePath, schema, false);
122125
} catch (Catalog.TableAlreadyExistException e) {
123-
throw new TableAlreadyExistException("Table " + tablePath + " already exists.");
126+
try {
127+
Table table = paimonCatalog.getTable(tablePath);
128+
FileStoreTable fileStoreTable = (FileStoreTable) table;
129+
validatePaimonSchemaCapability(fileStoreTable.schema().toSchema(), schema);
130+
} catch (Catalog.TableNotExistException tableNotExistException) {
131+
throw new RuntimeException("Table " + tablePath + " not exists.");
132+
}
124133
}
125134
}
126135

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/FlussDataTypeToPaimonDataType.java renamed to fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/FlussDataTypeToPaimonDataType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.fluss.lake.paimon;
18+
package org.apache.fluss.lake.paimon.utils;
1919

2020
import org.apache.fluss.types.ArrayType;
2121
import org.apache.fluss.types.BigIntType;

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.fluss.annotation.VisibleForTesting;
2121
import org.apache.fluss.exception.InvalidConfigException;
2222
import org.apache.fluss.exception.InvalidTableException;
23-
import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
2423
import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
2524
import org.apache.fluss.metadata.TableChange;
2625
import org.apache.fluss.metadata.TableDescriptor;
@@ -60,6 +59,7 @@ public class PaimonConversions {
6059
static {
6160
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key());
6261
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key());
62+
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PATH.key());
6363
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PARTITION_GENERATE_LEGCY_NAME.key());
6464
}
6565

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.paimon.utils;
19+
20+
import org.apache.fluss.exception.TableAlreadyExistException;
21+
22+
import org.apache.paimon.CoreOptions;
23+
import org.apache.paimon.schema.Schema;
24+
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
28+
/** Validator of Paimon table schema. */
29+
public class PaimonSchemaValidation {
30+
31+
public static void validatePaimonSchemaCapability(Schema existingSchema, Schema newSchema) {
32+
// check fields
33+
if (!existingSchema.fields().equals(newSchema.fields())) {
34+
throw new TableAlreadyExistException(
35+
String.format(
36+
"The fields of the existing Paimon table are not compatible with those of the new table to be created. "
37+
+ "Existing fields: %s, new fields: %s.",
38+
existingSchema.fields(), newSchema.fields()));
39+
}
40+
41+
// check pks
42+
if (!existingSchema.primaryKeys().equals(newSchema.primaryKeys())) {
43+
throw new TableAlreadyExistException(
44+
String.format(
45+
"The primary keys of the existing Paimon table are not compatible with those of the new table to be created. "
46+
+ "Existing primary keys: %s, new primary keys: %s.",
47+
existingSchema.primaryKeys(), newSchema.primaryKeys()));
48+
}
49+
50+
// check partition keys
51+
if (!existingSchema.partitionKeys().equals(newSchema.partitionKeys())) {
52+
throw new TableAlreadyExistException(
53+
String.format(
54+
"The partition keys of the existing Paimon table are not compatible with those of the new table to be created. "
55+
+ "Existing partition keys: %s, new partition keys: %s.",
56+
existingSchema.partitionKeys(), newSchema.partitionKeys()));
57+
}
58+
59+
// check options
60+
Map<String, String> existingOptions = new HashMap<>(existingSchema.options());
61+
// `path` will be set automatically by Paimon, so we need to remove it here
62+
existingOptions.remove(CoreOptions.PATH.key());
63+
if (!existingOptions.equals(newSchema.options())) {
64+
throw new TableAlreadyExistException(
65+
String.format(
66+
"The options of the existing Paimon table are not compatible with those of the new table to be created. "
67+
+ "Existing options: %s, new options: %s.",
68+
existingOptions, newSchema.options()));
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)