Skip to content

Commit 7b6ee5f

Browse files
committed
[server] Tolerate Paimon lake table existent if the schema and properties matches
1 parent 9c7c3ad commit 7b6ee5f

File tree

12 files changed

+451
-69
lines changed

12 files changed

+451
-69
lines changed

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -308,13 +308,9 @@ void testCreateAlreadyExistsLakeTable() throws Exception {
308308
assertThat(catalog.tableExists(lakeTablePath)).isTrue();
309309
// drop fluss table
310310
catalog.dropTable(lakeTablePath, false);
311-
// create the table again, should throw exception with ignore if exist = false
312-
assertThatThrownBy(() -> catalog.createTable(lakeTablePath, table, false))
313-
.isInstanceOf(CatalogException.class)
314-
.hasMessage(
315-
String.format(
316-
"The table %s already exists in %s catalog, please first drop the table in %s catalog or use a new table name.",
317-
lakeTablePath, "paimon", "paimon"));
311+
assertThat(catalog.tableExists(lakeTablePath)).isFalse();
312+
// create the table again should be ok, because the existing lake table is matched
313+
catalog.createTable(lakeTablePath, table, false);
318314
}
319315

320316
@Test

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.paimon.options.Options;
3535
import org.apache.paimon.schema.Schema;
3636
import org.apache.paimon.schema.SchemaChange;
37+
import org.apache.paimon.table.FileStoreTable;
38+
import org.apache.paimon.table.Table;
3739
import org.apache.paimon.types.DataType;
3840
import org.apache.paimon.types.DataTypes;
3941

@@ -43,6 +45,7 @@
4345
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
4446
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
4547
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
48+
import static org.apache.fluss.lake.paimon.utils.PaimonSchemaValidation.validatePaimonSchemaCapability;
4649
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
4750
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
4851
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -120,7 +123,19 @@ private void createTable(Identifier tablePath, Schema schema)
120123
// not ignore if table exists
121124
paimonCatalog.createTable(tablePath, schema, false);
122125
} catch (Catalog.TableAlreadyExistException e) {
123-
throw new TableAlreadyExistException("Table " + tablePath + " already exists.");
126+
try {
127+
Table table = paimonCatalog.getTable(tablePath);
128+
FileStoreTable fileStoreTable = (FileStoreTable) table;
129+
validatePaimonSchemaCapability(fileStoreTable.schema().toSchema(), schema);
130+
} catch (Catalog.TableNotExistException tableNotExistException) {
131+
throw new RuntimeException(
132+
String.format(
133+
"Failed to create table %s in Paimon. The table already existed "
134+
+ "during the initial creation attempt, but subsequently "
135+
+ "could not be found when trying to get it. "
136+
+ "Please check whether the Paimon table was manually deleted, and try again.",
137+
tablePath));
138+
}
124139
}
125140
}
126141

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/FlussDataTypeToPaimonDataType.java renamed to fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/FlussDataTypeToPaimonDataType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.fluss.lake.paimon;
18+
package org.apache.fluss.lake.paimon.utils;
1919

2020
import org.apache.fluss.types.ArrayType;
2121
import org.apache.fluss.types.BigIntType;

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.fluss.annotation.VisibleForTesting;
2121
import org.apache.fluss.exception.InvalidConfigException;
2222
import org.apache.fluss.exception.InvalidTableException;
23-
import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
2423
import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
2524
import org.apache.fluss.metadata.TableChange;
2625
import org.apache.fluss.metadata.TableDescriptor;
@@ -57,7 +56,7 @@ public class PaimonConversions {
5756
private static final String PARTITION_GENERATE_LEGACY_NAME_OPTION_KEY = "partition.legacy-name";
5857

5958
// for fluss config
60-
private static final String FLUSS_CONF_PREFIX = "fluss.";
59+
public static final String FLUSS_CONF_PREFIX = "fluss.";
6160
// for paimon config
6261
private static final String PAIMON_CONF_PREFIX = "paimon.";
6362

@@ -67,6 +66,7 @@ public class PaimonConversions {
6766
static {
6867
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key());
6968
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key());
69+
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PATH.key());
7070
PAIMON_UNSETTABLE_OPTIONS.add(PARTITION_GENERATE_LEGACY_NAME_OPTION_KEY);
7171
}
7272

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.paimon.utils;
19+
20+
import org.apache.fluss.config.ConfigOptions;
21+
import org.apache.fluss.exception.TableAlreadyExistException;
22+
23+
import org.apache.paimon.CoreOptions;
24+
import org.apache.paimon.schema.Schema;
25+
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
29+
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
30+
31+
/** Validator of Paimon table schema. */
32+
public class PaimonSchemaValidation {
33+
34+
public static void validatePaimonSchemaCapability(Schema existingSchema, Schema newSchema) {
35+
// check fields
36+
if (!existingSchema.fields().equals(newSchema.fields())) {
37+
throw new TableAlreadyExistException(
38+
String.format(
39+
"The fields of the existing Paimon table are not compatible with those of the new table to be created. "
40+
+ "Existing fields: %s, new fields: %s.",
41+
existingSchema.fields(), newSchema.fields()));
42+
}
43+
44+
// check pks
45+
if (!existingSchema.primaryKeys().equals(newSchema.primaryKeys())) {
46+
throw new TableAlreadyExistException(
47+
String.format(
48+
"The primary keys of the existing Paimon table are not compatible with those of the new table to be created. "
49+
+ "Existing primary keys: %s, new primary keys: %s.",
50+
existingSchema.primaryKeys(), newSchema.primaryKeys()));
51+
}
52+
53+
// check partition keys
54+
if (!existingSchema.partitionKeys().equals(newSchema.partitionKeys())) {
55+
throw new TableAlreadyExistException(
56+
String.format(
57+
"The partition keys of the existing Paimon table are not compatible with those of the new table to be created. "
58+
+ "Existing partition keys: %s, new partition keys: %s.",
59+
existingSchema.partitionKeys(), newSchema.partitionKeys()));
60+
}
61+
62+
// check options
63+
Map<String, String> existingOptions = new HashMap<>(existingSchema.options());
64+
Map<String, String> newOptions = new HashMap<>(newSchema.options());
65+
// `path` will be set automatically by Paimon, so we need to remove it here
66+
existingOptions.remove(CoreOptions.PATH.key());
67+
// when enable datalake with an existing table, we should ignore the
68+
// `table.datalake.enabled`
69+
String datalakeConfigKey = FLUSS_CONF_PREFIX + ConfigOptions.TABLE_DATALAKE_ENABLED.key();
70+
if (Boolean.FALSE.toString().equals(existingOptions.get(datalakeConfigKey))) {
71+
existingOptions.remove(datalakeConfigKey);
72+
newOptions.remove(datalakeConfigKey);
73+
}
74+
if (!existingOptions.equals(newOptions)) {
75+
throw new TableAlreadyExistException(
76+
String.format(
77+
"The options of the existing Paimon table are not compatible with those of the new table to be created. "
78+
+ "Existing options: %s, new options: %s.",
79+
existingOptions, newSchema.options()));
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)