1818package org .apache .fluss .lake .paimon ;
1919
2020import org .apache .fluss .annotation .VisibleForTesting ;
21+ import org .apache .fluss .config .ConfigOptions ;
2122import org .apache .fluss .config .Configuration ;
2223import org .apache .fluss .exception .TableAlreadyExistException ;
2324import org .apache .fluss .exception .TableNotExistException ;
2728import org .apache .fluss .metadata .TablePath ;
2829import org .apache .fluss .utils .IOUtils ;
2930
31+ import org .apache .paimon .CoreOptions ;
3032import org .apache .paimon .catalog .Catalog ;
3133import org .apache .paimon .catalog .CatalogContext ;
3234import org .apache .paimon .catalog .CatalogFactory ;
3335import org .apache .paimon .catalog .Identifier ;
3436import org .apache .paimon .options .Options ;
3537import org .apache .paimon .schema .Schema ;
3638import org .apache .paimon .schema .SchemaChange ;
39+ import org .apache .paimon .table .FileStoreTable ;
40+ import org .apache .paimon .table .Table ;
3741import org .apache .paimon .types .DataType ;
3842import org .apache .paimon .types .DataTypes ;
3943
4044import java .util .LinkedHashMap ;
4145import java .util .List ;
46+ import java .util .Map ;
4247
48+ import static org .apache .fluss .lake .paimon .utils .PaimonConversions .FLUSS_CONF_PREFIX ;
4349import static org .apache .fluss .lake .paimon .utils .PaimonConversions .toPaimon ;
4450import static org .apache .fluss .lake .paimon .utils .PaimonConversions .toPaimonSchema ;
4551import static org .apache .fluss .lake .paimon .utils .PaimonConversions .toPaimonSchemaChanges ;
@@ -83,12 +89,12 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co
8389 Identifier paimonPath = toPaimon (tablePath );
8490 Schema paimonSchema = toPaimonSchema (tableDescriptor );
8591 try {
86- createTable (paimonPath , paimonSchema );
92+ createTable (paimonPath , paimonSchema , context . isCreatingFlussTable () );
8793 } catch (Catalog .DatabaseNotExistException e ) {
8894 // create database
8995 createDatabase (tablePath .getDatabaseName ());
9096 try {
91- createTable (paimonPath , paimonSchema );
97+ createTable (paimonPath , paimonSchema , context . isCreatingFlussTable () );
9298 } catch (Catalog .DatabaseNotExistException t ) {
9399 // shouldn't happen in normal cases
94100 throw new RuntimeException (
@@ -114,13 +120,31 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
114120 }
115121 }
116122
117- private void createTable (Identifier tablePath , Schema schema )
123+ private void createTable (Identifier tablePath , Schema schema , boolean isCreatingFlussTable )
118124 throws Catalog .DatabaseNotExistException {
119125 try {
120126 // not ignore if table exists
121127 paimonCatalog .createTable (tablePath , schema , false );
122128 } catch (Catalog .TableAlreadyExistException e ) {
123- throw new TableAlreadyExistException ("Table " + tablePath + " already exists." );
129+ try {
130+ Table table = paimonCatalog .getTable (tablePath );
131+ FileStoreTable fileStoreTable = (FileStoreTable ) table ;
132+ validatePaimonSchemaCapability (
133+ tablePath , fileStoreTable .schema ().toSchema (), schema );
134+ // if creating a new fluss table, we should ensure the lake table is empty
135+ if (isCreatingFlussTable ) {
136+ checkTableIsEmpty (tablePath , fileStoreTable );
137+ }
138+ } catch (Catalog .TableNotExistException tableNotExistException ) {
139+ // shouldn't happen in normal cases
140+ throw new RuntimeException (
141+ String .format (
142+ "Failed to create table %s in Paimon. The table already existed "
143+ + "during the initial creation attempt, but subsequently "
144+ + "could not be found when trying to get it. "
145+ + "Please check whether the Paimon table was manually deleted, and try again." ,
146+ tablePath ));
147+ }
124148 }
125149 }
126150
@@ -142,6 +166,41 @@ private void alterTable(Identifier tablePath, List<SchemaChange> tableChanges)
142166 }
143167 }
144168
169+ private void validatePaimonSchemaCapability (
170+ Identifier tablePath , Schema existingSchema , Schema newSchema ) {
171+ // Adjust options for comparison
172+ Map <String , String > existingOptions = existingSchema .options ();
173+ Map <String , String > newOptions = newSchema .options ();
174+ // `path` will be set automatically by Paimon, so we need to remove it in existing options
175+ existingOptions .remove (CoreOptions .PATH .key ());
176+ // when enable datalake with an existing table, `table.datalake.enabled` will be `false`
177+ // in existing options, but `true` in new options.
178+ String datalakeConfigKey = FLUSS_CONF_PREFIX + ConfigOptions .TABLE_DATALAKE_ENABLED .key ();
179+ if (Boolean .FALSE .toString ().equalsIgnoreCase (existingOptions .get (datalakeConfigKey ))) {
180+ existingOptions .remove (datalakeConfigKey );
181+ newOptions .remove (datalakeConfigKey );
182+ }
183+
184+ if (!existingSchema .equals (newSchema )) {
185+ throw new TableAlreadyExistException (
186+ String .format (
187+ "The table %s already exists in Paimon catalog, but the table schema is not compatible. "
188+ + "Existing schema: %s, new schema: %s. "
189+ + "Please first drop the table in Paimon catalog or use a new table name." ,
190+ tablePath .getEscapedFullName (), existingSchema , newSchema ));
191+ }
192+ }
193+
194+ private void checkTableIsEmpty (Identifier tablePath , FileStoreTable table ) {
195+ if (table .latestSnapshot ().isPresent ()) {
196+ throw new TableAlreadyExistException (
197+ String .format (
198+ "The table %s already exists in Paimon catalog, and the table is not empty. "
199+ + "Please first drop the table in Paimon catalog or use a new table name." ,
200+ tablePath .getEscapedFullName ()));
201+ }
202+ }
203+
145204 @ Override
146205 public void close () {
147206 IOUtils .closeQuietly (paimonCatalog , "paimon catalog" );
0 commit comments