Skip to content

Commit 992c16e

Browse files
committed
Throw Exception If catalog default database is not existed.
1 parent 30dc6e3 commit 992c16e

File tree

3 files changed

+28
-10
lines changed

3 files changed

+28
-10
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public class FlinkCatalog extends AbstractCatalog {
108108
protected final ClassLoader classLoader;
109109

110110
protected final String catalogName;
111-
protected final @Nullable String defaultDatabase;
111+
protected final String defaultDatabase;
112112
protected final String bootstrapServers;
113113
private final Map<String, String> securityConfigs;
114114
protected Connection connection;
@@ -117,7 +117,7 @@ public class FlinkCatalog extends AbstractCatalog {
117117

118118
public FlinkCatalog(
119119
String name,
120-
@Nullable String defaultDatabase,
120+
String defaultDatabase,
121121
String bootstrapServers,
122122
ClassLoader classLoader,
123123
Map<String, String> securityConfigs) {
@@ -142,6 +142,10 @@ public void open() throws CatalogException {
142142

143143
connection = ConnectionFactory.createConnection(Configuration.fromMap(flussConfigs));
144144
admin = connection.getAdmin();
145+
if (!databaseExists(defaultDatabase)) {
146+
throw new CatalogException(
147+
String.format("Database %s does not exist in fluss server.", defaultDatabase));
148+
}
145149
}
146150

147151
@Override

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.table.catalog.Catalog;
3434
import org.apache.flink.table.catalog.CatalogTable;
3535
import org.apache.flink.table.catalog.ObjectPath;
36+
import org.apache.flink.table.catalog.exceptions.CatalogException;
3637
import org.apache.flink.types.Row;
3738
import org.apache.flink.util.CloseableIterator;
3839
import org.apache.flink.util.CollectionUtil;
@@ -657,6 +658,20 @@ void testAuthentication() throws Exception {
657658
}
658659
}
659660

661+
@Test
662+
void createCatalogWithUnexistedDatabase() {
663+
assertThatThrownBy(
664+
() ->
665+
tEnv.executeSql(
666+
String.format(
667+
"create catalog test_non_exist_database_catalog with ('type' = 'fluss', '%s' = '%s', 'default-database' = 'non-exist')",
668+
BOOTSTRAP_SERVERS.key(),
669+
FLUSS_CLUSTER_EXTENSION.getBootstrapServers())))
670+
.rootCause()
671+
.isExactlyInstanceOf(CatalogException.class)
672+
.hasMessage("Database non-exist does not exist in fluss server.");
673+
}
674+
660675
private static void assertOptionsEqual(
661676
Map<String, String> actualOptions, Map<String, String> expectedOptions) {
662677
actualOptions.remove(ConfigOptions.BOOTSTRAP_SERVERS.key());

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class FlinkCatalogTest {
8484
FlussClusterExtension.builder().setNumOfTabletServers(1).build();
8585

8686
private static final String CATALOG_NAME = "test-catalog";
87-
private static final String DEFAULT_DB = "default";
87+
private static final String DEFAULT_DB = FlinkCatalogOptions.DEFAULT_DATABASE.defaultValue();
8888
static Catalog catalog;
8989
private final ObjectPath tableInDefaultDb = new ObjectPath(DEFAULT_DB, "t1");
9090

@@ -407,8 +407,7 @@ void testDatabase() throws Exception {
407407
CatalogTable expectedTable = addOptions(table, addedOptions);
408408
checkEqualsRespectSchema((CatalogTable) tableCreated, expectedTable);
409409
assertThat(catalog.listTables("db1")).isEqualTo(Collections.singletonList("t1"));
410-
assertThat(catalog.listDatabases())
411-
.isEqualTo(Arrays.asList(DEFAULT_DB, "db1", "db2", "fluss"));
410+
assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList("db1", "db2", DEFAULT_DB));
412411
// test drop db1;
413412
// should throw exception since db1 is not empty and we set cascade = false
414413
assertThatThrownBy(() -> catalog.dropDatabase("db1", false, false))
@@ -424,10 +423,10 @@ void testDatabase() throws Exception {
424423
// should be ok since we set ignoreIfNotExists = true
425424
catalog.dropDatabase("db1", true, true);
426425
// test list db
427-
assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList(DEFAULT_DB, "db2", "fluss"));
426+
assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList("db2", DEFAULT_DB));
428427
catalog.dropDatabase("db2", false, true);
429428
// should be empty
430-
assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList(DEFAULT_DB, "fluss"));
429+
assertThat(catalog.listDatabases()).isEqualTo(Collections.singletonList(DEFAULT_DB));
431430
// should throw exception since the db is not exist and we set ignoreIfNotExists = false
432431
assertThatThrownBy(() -> catalog.listTables("unknown"))
433432
.isInstanceOf(DatabaseNotExistException.class)
@@ -465,7 +464,7 @@ void testOperatePartitions() throws Exception {
465464
catalog.createTable(path1, table, false);
466465
assertThatThrownBy(() -> catalog.listPartitions(path1))
467466
.isInstanceOf(TableNotPartitionedException.class)
468-
.hasMessage("Table default.t1 in catalog test-catalog is not partitioned.");
467+
.hasMessage("Table fluss.t1 in catalog test-catalog is not partitioned.");
469468

470469
// create partition table and list partitions.
471470
ObjectPath path2 = new ObjectPath(DEFAULT_DB, "partitioned_t1");
@@ -501,7 +500,7 @@ void testOperatePartitions() throws Exception {
501500
assertThatThrownBy(() -> catalog.listPartitions(path2, invalidTestSpec))
502501
.isInstanceOf(CatalogException.class)
503502
.hasMessage(
504-
"Failed to list partitions of table default.partitioned_t1 in test-catalog, by partitionSpec CatalogPartitionSpec{{second=}}");
503+
"Failed to list partitions of table fluss.partitioned_t1 in test-catalog, by partitionSpec CatalogPartitionSpec{{second=}}");
505504

506505
// NEW: Test dropPartition functionality
507506
CatalogPartitionSpec firstPartSpec = catalogPartitionSpecs.get(0);
@@ -520,7 +519,7 @@ void testOperatePartitions() throws Exception {
520519
.isInstanceOf(
521520
org.apache.flink.table.catalog.exceptions.PartitionNotExistException.class)
522521
.hasMessage(
523-
"Partition CatalogPartitionSpec{{first=999}} of table default.partitioned_t1 in catalog test-catalog does not exist.");
522+
"Partition CatalogPartitionSpec{{first=999}} of table fluss.partitioned_t1 in catalog test-catalog does not exist.");
524523

525524
// Should not throw with ignoreIfNotExists = true
526525
catalog.dropPartition(path2, nonExistentSpec, true);

0 commit comments

Comments
 (0)