Skip to content

Commit 3f8797c

Browse files
authored
[lake/paimon] Tolerate Paimon lake table existent with different insignificant options (#1995)
1 parent 64971b1 commit 3f8797c

File tree

3 files changed

+164
-40
lines changed

3 files changed

+164
-40
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 3 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.fluss.lake.paimon;
1919

2020
import org.apache.fluss.annotation.VisibleForTesting;
21-
import org.apache.fluss.config.ConfigOptions;
2221
import org.apache.fluss.config.Configuration;
2322
import org.apache.fluss.exception.TableAlreadyExistException;
2423
import org.apache.fluss.exception.TableNotExistException;
@@ -28,7 +27,6 @@
2827
import org.apache.fluss.metadata.TablePath;
2928
import org.apache.fluss.utils.IOUtils;
3029

31-
import org.apache.paimon.CoreOptions;
3230
import org.apache.paimon.catalog.Catalog;
3331
import org.apache.paimon.catalog.CatalogContext;
3432
import org.apache.paimon.catalog.CatalogFactory;
@@ -43,12 +41,12 @@
4341

4442
import java.util.LinkedHashMap;
4543
import java.util.List;
46-
import java.util.Map;
4744

48-
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
4945
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
5046
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
5147
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
48+
import static org.apache.fluss.lake.paimon.utils.PaimonTableValidation.checkTableIsEmpty;
49+
import static org.apache.fluss.lake.paimon.utils.PaimonTableValidation.validatePaimonSchemaCompatible;
5250
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
5351
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
5452
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -129,7 +127,7 @@ private void createTable(Identifier tablePath, Schema schema, boolean isCreating
129127
try {
130128
Table table = paimonCatalog.getTable(tablePath);
131129
FileStoreTable fileStoreTable = (FileStoreTable) table;
132-
validatePaimonSchemaCapability(
130+
validatePaimonSchemaCompatible(
133131
tablePath, fileStoreTable.schema().toSchema(), schema);
134132
// if creating a new fluss table, we should ensure the lake table is empty
135133
if (isCreatingFlussTable) {
@@ -166,41 +164,6 @@ private void alterTable(Identifier tablePath, List<SchemaChange> tableChanges)
166164
}
167165
}
168166

169-
private void validatePaimonSchemaCapability(
170-
Identifier tablePath, Schema existingSchema, Schema newSchema) {
171-
// Adjust options for comparison
172-
Map<String, String> existingOptions = existingSchema.options();
173-
Map<String, String> newOptions = newSchema.options();
174-
// `path` will be set automatically by Paimon, so we need to remove it in existing options
175-
existingOptions.remove(CoreOptions.PATH.key());
176-
// when enable datalake with an existing table, `table.datalake.enabled` will be `false`
177-
// in existing options, but `true` in new options.
178-
String datalakeConfigKey = FLUSS_CONF_PREFIX + ConfigOptions.TABLE_DATALAKE_ENABLED.key();
179-
if (Boolean.FALSE.toString().equalsIgnoreCase(existingOptions.get(datalakeConfigKey))) {
180-
existingOptions.remove(datalakeConfigKey);
181-
newOptions.remove(datalakeConfigKey);
182-
}
183-
184-
if (!existingSchema.equals(newSchema)) {
185-
throw new TableAlreadyExistException(
186-
String.format(
187-
"The table %s already exists in Paimon catalog, but the table schema is not compatible. "
188-
+ "Existing schema: %s, new schema: %s. "
189-
+ "Please first drop the table in Paimon catalog or use a new table name.",
190-
tablePath.getEscapedFullName(), existingSchema, newSchema));
191-
}
192-
}
193-
194-
private void checkTableIsEmpty(Identifier tablePath, FileStoreTable table) {
195-
if (table.latestSnapshot().isPresent()) {
196-
throw new TableAlreadyExistException(
197-
String.format(
198-
"The table %s already exists in Paimon catalog, and the table is not empty. "
199-
+ "Please first drop the table in Paimon catalog or use a new table name.",
200-
tablePath.getEscapedFullName()));
201-
}
202-
}
203-
204167
@Override
205168
public void close() {
206169
IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.paimon.utils;
19+
20+
import org.apache.fluss.exception.TableAlreadyExistException;
21+
22+
import org.apache.paimon.CoreOptions;
23+
import org.apache.paimon.catalog.Identifier;
24+
import org.apache.paimon.options.ConfigOption;
25+
import org.apache.paimon.schema.Schema;
26+
import org.apache.paimon.table.FileStoreTable;
27+
28+
import java.lang.reflect.Field;
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
32+
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
33+
34+
/** Utils to verify whether the existing Paimon table is compatible with the table to be created. */
35+
public class PaimonTableValidation {
36+
37+
private static final Map<String, ConfigOption<?>> PAIMON_CONFIGS = extractPaimonConfigs();
38+
39+
public static void validatePaimonSchemaCompatible(
40+
Identifier tablePath, Schema existingSchema, Schema newSchema) {
41+
// Adjust options for comparison
42+
Map<String, String> existingOptions = existingSchema.options();
43+
Map<String, String> newOptions = newSchema.options();
44+
45+
// when enable datalake with an existing table, `table.datalake.enabled` will be `false`
46+
// in existing options, but `true` in new options.
47+
String datalakeConfigKey =
48+
FLUSS_CONF_PREFIX
49+
+ org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED.key();
50+
if (Boolean.FALSE.toString().equalsIgnoreCase(existingOptions.get(datalakeConfigKey))) {
51+
existingOptions.remove(datalakeConfigKey);
52+
newOptions.remove(datalakeConfigKey);
53+
}
54+
55+
// remove changeable options
56+
removeChangeableOptions(existingOptions);
57+
removeChangeableOptions(newOptions);
58+
59+
// ignore the existing options that are not in new options
60+
existingOptions.entrySet().removeIf(entry -> !newOptions.containsKey(entry.getKey()));
61+
62+
if (!existingSchema.equals(newSchema)) {
63+
throw new TableAlreadyExistException(
64+
String.format(
65+
"The table %s already exists in Paimon catalog, but the table schema is not compatible. "
66+
+ "Existing schema: %s, new schema: %s. "
67+
+ "Please first drop the table in Paimon catalog or use a new table name.",
68+
tablePath.getEscapedFullName(), existingSchema, newSchema));
69+
}
70+
}
71+
72+
private static void removeChangeableOptions(Map<String, String> options) {
73+
options.entrySet()
74+
.removeIf(
75+
entry ->
76+
// currently we take all Paimon options and Fluss option as
77+
// unchangeable.
78+
!PAIMON_CONFIGS.containsKey(entry.getKey())
79+
&& !entry.getKey().startsWith(FLUSS_CONF_PREFIX));
80+
}
81+
82+
public static void checkTableIsEmpty(Identifier tablePath, FileStoreTable table) {
83+
if (table.latestSnapshot().isPresent()) {
84+
throw new TableAlreadyExistException(
85+
String.format(
86+
"The table %s already exists in Paimon catalog, and the table is not empty. "
87+
+ "Please first drop the table in Paimon catalog or use a new table name.",
88+
tablePath.getEscapedFullName()));
89+
}
90+
}
91+
92+
private static Map<String, ConfigOption<?>> extractPaimonConfigs() {
93+
Map<String, ConfigOption<?>> options = new HashMap<>();
94+
95+
Field[] fields = CoreOptions.class.getFields();
96+
for (Field field : fields) {
97+
if (!ConfigOption.class.isAssignableFrom(field.getType())) {
98+
continue;
99+
}
100+
101+
try {
102+
ConfigOption<?> configOption = (ConfigOption<?>) field.get(null);
103+
options.put(configOption.key(), configOption);
104+
} catch (IllegalAccessException e) {
105+
throw new RuntimeException(
106+
"Unable to extract ConfigOption fields from CoreOptions class.", e);
107+
}
108+
}
109+
110+
return options;
111+
}
112+
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.paimon.data.GenericRow;
4444
import org.apache.paimon.data.Timestamp;
4545
import org.apache.paimon.options.Options;
46+
import org.apache.paimon.schema.SchemaChange;
4647
import org.apache.paimon.table.Table;
4748
import org.apache.paimon.table.sink.BatchTableCommit;
4849
import org.apache.paimon.table.sink.BatchTableWrite;
@@ -476,6 +477,54 @@ void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
476477
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
477478
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
478479
+ "Please first drop the table in Paimon catalog or use a new table name.");
480+
481+
// add an insignificant option to Paimon table will be ok
482+
Identifier paimonTablePath =
483+
Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
484+
SchemaChange schemaChange1 = SchemaChange.setOption("any.k1", "any.v1");
485+
paimonCatalog.alterTable(paimonTablePath, Collections.singletonList(schemaChange1), false);
486+
admin.createTable(tablePath, td, false).get();
487+
admin.dropTable(tablePath, false).get();
488+
489+
// alter a Fluss option to Paimon table will throw exception
490+
SchemaChange schemaChange2 = SchemaChange.setOption("fluss.k1", "v2");
491+
paimonCatalog.alterTable(paimonTablePath, Collections.singletonList(schemaChange2), false);
492+
TableDescriptor finalTd = td;
493+
assertThatThrownBy(() -> admin.createTable(tablePath, finalTd, false).get())
494+
.cause()
495+
.isInstanceOf(LakeTableAlreadyExistException.class)
496+
.hasMessageContaining(
497+
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, "
498+
+ "but the table schema is not compatible.");
499+
500+
// reset fluss.k1 in Paimon
501+
SchemaChange schemaChange3 = SchemaChange.setOption("fluss.k1", "v1");
502+
paimonCatalog.alterTable(paimonTablePath, Collections.singletonList(schemaChange3), false);
503+
504+
// add a new Paimon option (not specified in the Fluss table) to Paimon table will be ok
505+
SchemaChange schemaChange4 =
506+
SchemaChange.setOption(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "2");
507+
paimonCatalog.alterTable(paimonTablePath, Collections.singletonList(schemaChange4), false);
508+
admin.createTable(tablePath, finalTd, false).get();
509+
admin.dropTable(tablePath, false).get();
510+
511+
// try to create a Fluss table specify a different value of exist Paimon option will throw
512+
// exception
513+
customProperties.put("paimon.snapshot.num-retained.min", "3");
514+
TableDescriptor td1 =
515+
createTableDescriptor(
516+
2,
517+
BUCKET_NUM,
518+
Collections.emptyList(),
519+
Collections.emptyList(),
520+
customProperties,
521+
false);
522+
assertThatThrownBy(() -> admin.createTable(tablePath, td1, false).get())
523+
.cause()
524+
.isInstanceOf(LakeTableAlreadyExistException.class)
525+
.hasMessageContaining(
526+
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, "
527+
+ "but the table schema is not compatible.");
479528
}
480529

481530
@Test

0 commit comments

Comments
 (0)