Skip to content

Commit 71ac6ba

Browse files
committed
Throw Exception If catalog default database is not existed or authorized.
1 parent 1f9de70 commit 71ac6ba

File tree

3 files changed

+53
-4
lines changed

3 files changed

+53
-4
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public class FlinkCatalog extends AbstractCatalog {
108108
protected final ClassLoader classLoader;
109109

110110
protected final String catalogName;
111-
protected final @Nullable String defaultDatabase;
111+
protected final String defaultDatabase;
112112
protected final String bootstrapServers;
113113
private final Map<String, String> securityConfigs;
114114
protected Connection connection;
@@ -117,7 +117,7 @@ public class FlinkCatalog extends AbstractCatalog {
117117

118118
public FlinkCatalog(
119119
String name,
120-
@Nullable String defaultDatabase,
120+
String defaultDatabase,
121121
String bootstrapServers,
122122
ClassLoader classLoader,
123123
Map<String, String> securityConfigs) {
@@ -142,6 +142,10 @@ public void open() throws CatalogException {
142142

143143
connection = ConnectionFactory.createConnection(Configuration.fromMap(flussConfigs));
144144
admin = connection.getAdmin();
145+
if (!databaseExists(defaultDatabase)) {
146+
throw new CatalogException(
147+
String.format("Database %s does not exist in fluss server.", defaultDatabase));
148+
}
145149
}
146150

147151
@Override

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.cluster.ServerNode;
2121
import org.apache.fluss.config.ConfigOptions;
2222
import org.apache.fluss.config.Configuration;
23+
import org.apache.fluss.exception.AuthorizationException;
2324
import org.apache.fluss.exception.InvalidTableException;
2425
import org.apache.fluss.metadata.TablePath;
2526
import org.apache.fluss.server.testutils.FlussClusterExtension;
@@ -33,6 +34,7 @@
3334
import org.apache.flink.table.catalog.Catalog;
3435
import org.apache.flink.table.catalog.CatalogTable;
3536
import org.apache.flink.table.catalog.ObjectPath;
37+
import org.apache.flink.table.catalog.exceptions.CatalogException;
3638
import org.apache.flink.types.Row;
3739
import org.apache.flink.util.CloseableIterator;
3840
import org.apache.flink.util.CollectionUtil;
@@ -54,6 +56,7 @@
5456
import java.util.Map;
5557
import java.util.stream.Collectors;
5658

59+
import static org.apache.fluss.config.ConfigOptions.AUTHORIZER_ENABLED;
5760
import static org.apache.fluss.config.ConfigOptions.DEFAULT_LISTENER_NAME;
5861
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
5962
import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
@@ -594,12 +597,13 @@ void testAuthentication() throws Exception {
594597
Configuration serverConfig = new Configuration();
595598
serverConfig.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl");
596599
serverConfig.setString("security.sasl.enabled.mechanisms", "plain");
600+
serverConfig.setString(AUTHORIZER_ENABLED.key(), "true");
597601
serverConfig.setString(
598602
"security.sasl.plain.jaas.config",
599603
"org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required "
600604
+ " user_root=\"password\" "
601605
+ " user_guest=\"password2\";");
602-
serverConfig.setString(ConfigOptions.SUPER_USERS.key(), "USER:root");
606+
serverConfig.setString(ConfigOptions.SUPER_USERS.key(), "User:root");
603607
FlussClusterExtension flussClusterExtension =
604608
FlussClusterExtension.builder()
605609
.setCoordinatorServerListeners(
@@ -613,13 +617,34 @@ void testAuthentication() throws Exception {
613617
.setClusterConf(serverConfig)
614618
.build();
615619
Catalog authenticateCatalog = null;
620+
616621
try {
617622
flussClusterExtension.start();
618623
ServerNode coordinatorServerNode =
619624
flussClusterExtension.getCoordinatorServerNode(clientListenerName);
620625
String bootstrapServers =
621626
String.format(
622627
"%s:%d", coordinatorServerNode.host(), coordinatorServerNode.port());
628+
629+
assertThatThrownBy(
630+
() ->
631+
tEnv.executeSql(
632+
String.format(
633+
"create catalog test_non_authorization_catalog with ('type' = 'fluss', "
634+
+ "'%s' = '%s', "
635+
+ "'default-database' = '%s', "
636+
+ "'client.security.protocol' = 'sasl',"
637+
+ "'client.security.sasl.username' = 'guest', "
638+
+ "'client.security.sasl.password' = 'password2' "
639+
+ " )",
640+
BOOTSTRAP_SERVERS.key(),
641+
bootstrapServers,
642+
DEFAULT_DB)))
643+
.rootCause()
644+
.isExactlyInstanceOf(AuthorizationException.class)
645+
.hasMessageContaining(
646+
"Principal FlussPrincipal{name='guest', type='User'} have no authorization to operate DESCRIBE on resource Resource{type=DATABASE, name='fluss'}");
647+
623648
authenticateCatalog =
624649
new FlinkCatalog(
625650
CATALOG_NAME,
@@ -635,7 +660,6 @@ void testAuthentication() throws Exception {
635660

636661
Map<String, String> clientConfig = new HashMap<>();
637662
clientConfig.put(ConfigOptions.CLIENT_SECURITY_PROTOCOL.key(), "sasl");
638-
clientConfig.put(ConfigOptions.CLIENT_SASL_MECHANISM.key(), "plain");
639663
clientConfig.put("client.security.sasl.username", "root");
640664
clientConfig.put("client.security.sasl.password", "password");
641665
authenticateCatalog =
@@ -657,6 +681,20 @@ void testAuthentication() throws Exception {
657681
}
658682
}
659683

684+
@Test
685+
void createCatalogWithUnexistedDatabase() {
686+
assertThatThrownBy(
687+
() ->
688+
tEnv.executeSql(
689+
String.format(
690+
"create catalog test_non_exist_database_catalog with ('type' = 'fluss', '%s' = '%s', 'default-database' = 'non-exist')",
691+
BOOTSTRAP_SERVERS.key(),
692+
FLUSS_CLUSTER_EXTENSION.getBootstrapServers())))
693+
.rootCause()
694+
.isExactlyInstanceOf(CatalogException.class)
695+
.hasMessage("Database non-exist does not exist in fluss server.");
696+
}
697+
660698
private static void assertOptionsEqual(
661699
Map<String, String> actualOptions, Map<String, String> expectedOptions) {
662700
actualOptions.remove(ConfigOptions.BOOTSTRAP_SERVERS.key());

fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,13 @@ public CompletableFuture<GetDatabaseInfoResponse> getDatabaseInfo(
219219

220220
@Override
221221
public CompletableFuture<DatabaseExistsResponse> databaseExists(DatabaseExistsRequest request) {
222+
if (authorizer != null) {
223+
authorizer.authorize(
224+
currentSession(),
225+
OperationType.DESCRIBE,
226+
Resource.database(request.getDatabaseName()));
227+
}
228+
222229
DatabaseExistsResponse response = new DatabaseExistsResponse();
223230
boolean exists = metadataManager.databaseExists(request.getDatabaseName());
224231
response.setExists(exists);

0 commit comments

Comments
 (0)