Skip to content

Commit 32003fc

Browse files
authored
[lake] Tolerate Paimon lake table existent if the schema and properties matches (#1847)
1 parent 5542274 commit 32003fc

File tree

13 files changed

+386
-82
lines changed

13 files changed

+386
-82
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ default void close() throws Exception {
7575
@PublicEvolving
7676
interface Context {
7777

78+
/**
79+
* Whether the current operation is creating a fluss table.
80+
*
81+
* @return true if the current operation is creating a fluss table
82+
*/
83+
boolean isCreatingFlussTable();
84+
7885
/** Get the fluss principal currently accessing the catalog. */
7986
FlussPrincipal getFlussPrincipal();
8087
}

fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222
/** A testing implementation of {@link LakeCatalog.Context}. */
2323
public class TestingLakeCatalogContext implements LakeCatalog.Context {
2424

25+
@Override
26+
public boolean isCreatingFlussTable() {
27+
return false;
28+
}
29+
2530
@Override
2631
public FlussPrincipal getFlussPrincipal() {
2732
return null;

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -308,13 +308,9 @@ void testCreateAlreadyExistsLakeTable() throws Exception {
308308
assertThat(catalog.tableExists(lakeTablePath)).isTrue();
309309
// drop fluss table
310310
catalog.dropTable(lakeTablePath, false);
311-
// create the table again, should throw exception with ignore if exist = false
312-
assertThatThrownBy(() -> catalog.createTable(lakeTablePath, table, false))
313-
.isInstanceOf(CatalogException.class)
314-
.hasMessage(
315-
String.format(
316-
"The table %s already exists in %s catalog, please first drop the table in %s catalog or use a new table name.",
317-
lakeTablePath, "paimon", "paimon"));
311+
assertThat(catalog.tableExists(lakeTablePath)).isFalse();
312+
// create the table again should be ok, because the existing lake table is matched
313+
catalog.createTable(lakeTablePath, table, false);
318314
}
319315

320316
@Test

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.lake.paimon;
1919

2020
import org.apache.fluss.annotation.VisibleForTesting;
21+
import org.apache.fluss.config.ConfigOptions;
2122
import org.apache.fluss.config.Configuration;
2223
import org.apache.fluss.exception.TableAlreadyExistException;
2324
import org.apache.fluss.exception.TableNotExistException;
@@ -27,19 +28,24 @@
2728
import org.apache.fluss.metadata.TablePath;
2829
import org.apache.fluss.utils.IOUtils;
2930

31+
import org.apache.paimon.CoreOptions;
3032
import org.apache.paimon.catalog.Catalog;
3133
import org.apache.paimon.catalog.CatalogContext;
3234
import org.apache.paimon.catalog.CatalogFactory;
3335
import org.apache.paimon.catalog.Identifier;
3436
import org.apache.paimon.options.Options;
3537
import org.apache.paimon.schema.Schema;
3638
import org.apache.paimon.schema.SchemaChange;
39+
import org.apache.paimon.table.FileStoreTable;
40+
import org.apache.paimon.table.Table;
3741
import org.apache.paimon.types.DataType;
3842
import org.apache.paimon.types.DataTypes;
3943

4044
import java.util.LinkedHashMap;
4145
import java.util.List;
46+
import java.util.Map;
4247

48+
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
4349
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
4450
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
4551
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
@@ -83,12 +89,12 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co
8389
Identifier paimonPath = toPaimon(tablePath);
8490
Schema paimonSchema = toPaimonSchema(tableDescriptor);
8591
try {
86-
createTable(paimonPath, paimonSchema);
92+
createTable(paimonPath, paimonSchema, context.isCreatingFlussTable());
8793
} catch (Catalog.DatabaseNotExistException e) {
8894
// create database
8995
createDatabase(tablePath.getDatabaseName());
9096
try {
91-
createTable(paimonPath, paimonSchema);
97+
createTable(paimonPath, paimonSchema, context.isCreatingFlussTable());
9298
} catch (Catalog.DatabaseNotExistException t) {
9399
// shouldn't happen in normal cases
94100
throw new RuntimeException(
@@ -114,13 +120,31 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
114120
}
115121
}
116122

117-
private void createTable(Identifier tablePath, Schema schema)
123+
private void createTable(Identifier tablePath, Schema schema, boolean isCreatingFlussTable)
118124
throws Catalog.DatabaseNotExistException {
119125
try {
120126
// not ignore if table exists
121127
paimonCatalog.createTable(tablePath, schema, false);
122128
} catch (Catalog.TableAlreadyExistException e) {
123-
throw new TableAlreadyExistException("Table " + tablePath + " already exists.");
129+
try {
130+
Table table = paimonCatalog.getTable(tablePath);
131+
FileStoreTable fileStoreTable = (FileStoreTable) table;
132+
validatePaimonSchemaCapability(
133+
tablePath, fileStoreTable.schema().toSchema(), schema);
134+
// if creating a new fluss table, we should ensure the lake table is empty
135+
if (isCreatingFlussTable) {
136+
checkTableIsEmpty(tablePath, fileStoreTable);
137+
}
138+
} catch (Catalog.TableNotExistException tableNotExistException) {
139+
// shouldn't happen in normal cases
140+
throw new RuntimeException(
141+
String.format(
142+
"Failed to create table %s in Paimon. The table already existed "
143+
+ "during the initial creation attempt, but subsequently "
144+
+ "could not be found when trying to get it. "
145+
+ "Please check whether the Paimon table was manually deleted, and try again.",
146+
tablePath));
147+
}
124148
}
125149
}
126150

@@ -142,6 +166,41 @@ private void alterTable(Identifier tablePath, List<SchemaChange> tableChanges)
142166
}
143167
}
144168

169+
private void validatePaimonSchemaCapability(
170+
Identifier tablePath, Schema existingSchema, Schema newSchema) {
171+
// Adjust options for comparison
172+
Map<String, String> existingOptions = existingSchema.options();
173+
Map<String, String> newOptions = newSchema.options();
174+
// `path` will be set automatically by Paimon, so we need to remove it in existing options
175+
existingOptions.remove(CoreOptions.PATH.key());
176+
// when enable datalake with an existing table, `table.datalake.enabled` will be `false`
177+
// in existing options, but `true` in new options.
178+
String datalakeConfigKey = FLUSS_CONF_PREFIX + ConfigOptions.TABLE_DATALAKE_ENABLED.key();
179+
if (Boolean.FALSE.toString().equalsIgnoreCase(existingOptions.get(datalakeConfigKey))) {
180+
existingOptions.remove(datalakeConfigKey);
181+
newOptions.remove(datalakeConfigKey);
182+
}
183+
184+
if (!existingSchema.equals(newSchema)) {
185+
throw new TableAlreadyExistException(
186+
String.format(
187+
"The table %s already exists in Paimon catalog, but the table schema is not compatible. "
188+
+ "Existing schema: %s, new schema: %s. "
189+
+ "Please first drop the table in Paimon catalog or use a new table name.",
190+
tablePath.getEscapedFullName(), existingSchema, newSchema));
191+
}
192+
}
193+
194+
private void checkTableIsEmpty(Identifier tablePath, FileStoreTable table) {
195+
if (table.latestSnapshot().isPresent()) {
196+
throw new TableAlreadyExistException(
197+
String.format(
198+
"The table %s already exists in Paimon catalog, and the table is not empty. "
199+
+ "Please first drop the table in Paimon catalog or use a new table name.",
200+
tablePath.getEscapedFullName()));
201+
}
202+
}
203+
145204
@Override
146205
public void close() {
147206
IOUtils.closeQuietly(paimonCatalog, "paimon catalog");

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/FlussDataTypeToPaimonDataType.java renamed to fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/FlussDataTypeToPaimonDataType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.fluss.lake.paimon;
18+
package org.apache.fluss.lake.paimon.utils;
1919

2020
import org.apache.fluss.types.ArrayType;
2121
import org.apache.fluss.types.BigIntType;

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.fluss.annotation.VisibleForTesting;
2121
import org.apache.fluss.exception.InvalidConfigException;
2222
import org.apache.fluss.exception.InvalidTableException;
23-
import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
2423
import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
2524
import org.apache.fluss.metadata.TableChange;
2625
import org.apache.fluss.metadata.TableDescriptor;
@@ -57,7 +56,7 @@ public class PaimonConversions {
5756
private static final String PARTITION_GENERATE_LEGACY_NAME_OPTION_KEY = "partition.legacy-name";
5857

5958
// for fluss config
60-
private static final String FLUSS_CONF_PREFIX = "fluss.";
59+
public static final String FLUSS_CONF_PREFIX = "fluss.";
6160
// for paimon config
6261
private static final String PAIMON_CONF_PREFIX = "paimon.";
6362

@@ -67,6 +66,7 @@ public class PaimonConversions {
6766
static {
6867
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key());
6968
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key());
69+
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PATH.key());
7070
PAIMON_UNSETTABLE_OPTIONS.add(PARTITION_GENERATE_LEGACY_NAME_OPTION_KEY);
7171
}
7272

0 commit comments

Comments
 (0)