1818package org .apache .fluss .lake .paimon ;
1919
2020import org .apache .fluss .annotation .VisibleForTesting ;
21+ import org .apache .fluss .config .ConfigOptions ;
2122import org .apache .fluss .config .Configuration ;
2223import org .apache .fluss .exception .TableAlreadyExistException ;
2324import org .apache .fluss .exception .TableNotExistException ;
2728import org .apache .fluss .metadata .TablePath ;
2829import org .apache .fluss .utils .IOUtils ;
2930
31+ import org .apache .paimon .CoreOptions ;
3032import org .apache .paimon .catalog .Catalog ;
3133import org .apache .paimon .catalog .CatalogContext ;
3234import org .apache .paimon .catalog .CatalogFactory ;
3941import org .apache .paimon .types .DataType ;
4042import org .apache .paimon .types .DataTypes ;
4143
44+ import java .util .HashMap ;
4245import java .util .LinkedHashMap ;
4346import java .util .List ;
47+ import java .util .Map ;
4448
49+ import static org .apache .fluss .lake .paimon .utils .PaimonConversions .FLUSS_CONF_PREFIX ;
4550import static org .apache .fluss .lake .paimon .utils .PaimonConversions .toPaimon ;
4651import static org .apache .fluss .lake .paimon .utils .PaimonConversions .toPaimonSchema ;
4752import static org .apache .fluss .lake .paimon .utils .PaimonConversions .toPaimonSchemaChanges ;
48- import static org .apache .fluss .lake .paimon .utils .PaimonSchemaValidation .validatePaimonSchemaCapability ;
4953import static org .apache .fluss .metadata .TableDescriptor .BUCKET_COLUMN_NAME ;
5054import static org .apache .fluss .metadata .TableDescriptor .OFFSET_COLUMN_NAME ;
5155import static org .apache .fluss .metadata .TableDescriptor .TIMESTAMP_COLUMN_NAME ;
@@ -86,12 +90,12 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co
8690 Identifier paimonPath = toPaimon (tablePath );
8791 Schema paimonSchema = toPaimonSchema (tableDescriptor );
8892 try {
89- createTable (paimonPath , paimonSchema );
93+ createTable (paimonPath , paimonSchema , context . isCreatingFlussTable () );
9094 } catch (Catalog .DatabaseNotExistException e ) {
9195 // create database
9296 createDatabase (tablePath .getDatabaseName ());
9397 try {
94- createTable (paimonPath , paimonSchema );
98+ createTable (paimonPath , paimonSchema , context . isCreatingFlussTable () );
9599 } catch (Catalog .DatabaseNotExistException t ) {
96100 // shouldn't happen in normal cases
97101 throw new RuntimeException (
@@ -117,7 +121,7 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
117121 }
118122 }
119123
120- private void createTable (Identifier tablePath , Schema schema )
124+ private void createTable (Identifier tablePath , Schema schema , boolean isCreatingFlussTable )
121125 throws Catalog .DatabaseNotExistException {
122126 try {
123127 // not ignore if table exists
@@ -126,8 +130,14 @@ private void createTable(Identifier tablePath, Schema schema)
126130 try {
127131 Table table = paimonCatalog .getTable (tablePath );
128132 FileStoreTable fileStoreTable = (FileStoreTable ) table ;
129- validatePaimonSchemaCapability (fileStoreTable .schema ().toSchema (), schema );
133+ validatePaimonSchemaCapability (
134+ tablePath , fileStoreTable .schema ().toSchema (), schema );
135+ // if creating a new fluss table, we should ensure the lake table is empty
136+ if (isCreatingFlussTable ) {
137+ checkTableIsEmpty (tablePath , fileStoreTable );
138+ }
130139 } catch (Catalog .TableNotExistException tableNotExistException ) {
140+ // shouldn't happen in normal cases
131141 throw new RuntimeException (
132142 String .format (
133143 "Failed to create table %s in Paimon. The table already existed "
@@ -157,6 +167,67 @@ private void alterTable(Identifier tablePath, List<SchemaChange> tableChanges)
157167 }
158168 }
159169
170+ private void validatePaimonSchemaCapability (
171+ Identifier tablePath , Schema existingSchema , Schema newSchema ) {
172+ // Adjust options for comparison
173+ Map <String , String > existingOptions = new HashMap <>(existingSchema .options ());
174+ Map <String , String > newOptions = new HashMap <>(newSchema .options ());
175+ // `path` will be set automatically by Paimon, so we need to remove it in existing options
176+ existingOptions .remove (CoreOptions .PATH .key ());
177+ // when enable datalake with an existing table, `table.datalake.enabled` will be `false`
178+ // in existing options, but `true` in new options.
179+ String datalakeConfigKey = FLUSS_CONF_PREFIX + ConfigOptions .TABLE_DATALAKE_ENABLED .key ();
180+ if (Boolean .FALSE .toString ().equalsIgnoreCase (existingOptions .get (datalakeConfigKey ))) {
181+ existingOptions .remove (datalakeConfigKey );
182+ newOptions .remove (datalakeConfigKey );
183+ }
184+
185+ // Build schemas with adjusted options for comparison
186+ Schema adjustedExistingSchema = buildSchemaWithOptions (existingSchema , existingOptions );
187+ Schema adjustedNewSchema = buildSchemaWithOptions (newSchema , newOptions );
188+
189+ if (!adjustedExistingSchema .equals (adjustedNewSchema )) {
190+ throw new TableAlreadyExistException (
191+ String .format (
192+ "The table %s already exists in Paimon catalog, but the table schema is not compatible. "
193+ + "Existing schema: %s, new schema: %s. "
194+ + "Please first drop the table in Paimon catalog or use a new table name." ,
195+ tablePath .getEscapedFullName (),
196+ adjustedExistingSchema ,
197+ adjustedNewSchema ));
198+ }
199+ }
200+
201+ private Schema buildSchemaWithOptions (Schema schema , Map <String , String > options ) {
202+ Schema .Builder builder = Schema .newBuilder ();
203+ // Copy fields
204+ for (org .apache .paimon .types .DataField field : schema .fields ()) {
205+ builder .column (field .name (), field .type (), field .description ());
206+ }
207+ // Copy primary keys
208+ if (!schema .primaryKeys ().isEmpty ()) {
209+ builder .primaryKey (schema .primaryKeys ());
210+ }
211+ // Copy partition keys
212+ if (!schema .partitionKeys ().isEmpty ()) {
213+ builder .partitionKeys (schema .partitionKeys ());
214+ }
215+ // Set adjusted options
216+ builder .options (options );
217+ builder .comment (schema .comment ());
218+ return builder .build ();
219+ }
220+
221+ private void checkTableIsEmpty (Identifier tablePath , FileStoreTable table ) {
222+ if (!table .latestSnapshot ().isEmpty ()) {
223+ throw new TableAlreadyExistException (
224+ String .format (
225+ "The table %s already exists in Paimon catalog, and the table is not empty. "
226+ + "Please first drop the table in Paimon catalog or use a new table name." ,
227+ tablePath .getEscapedFullName ()));
228+ }
229+ }
230+
160231 @ Override
161232 public void close () {
162233 IOUtils .closeQuietly (paimonCatalog , "paimon catalog" );
0 commit comments