Skip to content

Commit 23f6b1c

Browse files
[server] Fix SASL createPrincipal mismatch username and type (#1099)
1 parent 7f0292e commit 23f6b1c

File tree

12 files changed

+90
-193
lines changed

12 files changed

+90
-193
lines changed

fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,19 +102,20 @@ public class FlussAuthorizationITCase {
102102
@BeforeEach
103103
protected void setup() throws Exception {
104104
Configuration conf = FLUSS_CLUSTER_EXTENSION.getClientConfig("CLIENT");
105-
conf.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "username_password");
105+
conf.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "sasl");
106+
conf.set(ConfigOptions.CLIENT_SASL_MECHANISM, "plain");
106107
Configuration rootConf = new Configuration(conf);
107-
rootConf.setString("client.security.username_password.username", "root");
108-
rootConf.setString("client.security.username_password.password", "password");
108+
rootConf.setString("client.security.sasl.username", "root");
109+
rootConf.setString("client.security.sasl.password", "password");
109110
rootConn = ConnectionFactory.createConnection(rootConf);
110111
rootAdmin = rootConn.getAdmin();
111112

112113
guestConf = new Configuration(conf);
113-
guestConf.setString("client.security.username_password.username", "guest");
114-
guestConf.setString("client.security.username_password.password", "password2");
114+
guestConf.setString("client.security.sasl.username", "guest");
115+
guestConf.setString("client.security.sasl.password", "password2");
115116
guestConn = ConnectionFactory.createConnection(guestConf);
116117
guestAdmin = guestConn.getAdmin();
117-
guestPrincipal = new FlussPrincipal("guest", "USER");
118+
guestPrincipal = new FlussPrincipal("guest", "User");
118119

119120
// prepare default database and table
120121
rootAdmin
@@ -164,9 +165,10 @@ void testNoAuthorizer() throws Exception {
164165
try {
165166
flussClusterExtension.start();
166167
Configuration conf = new Configuration(flussClusterExtension.getClientConfig("CLIENT"));
167-
conf.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "username_password");
168-
conf.setString("client.security.username_password.username", "root");
169-
conf.setString("client.security.username_password.password", "password");
168+
conf.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "sasl");
169+
conf.set(ConfigOptions.CLIENT_SASL_MECHANISM, "plain");
170+
conf.setString("client.security.sasl.username", "root");
171+
conf.setString("client.security.sasl.password", "password");
170172
try (Connection connection = ConnectionFactory.createConnection(conf);
171173
Admin admin = connection.getAdmin()) {
172174
assertThatThrownBy(
@@ -230,7 +232,7 @@ void testAclOperation() throws Exception {
230232
assertThat(guestAdmin.listAcls(AclBindingFilter.ANY).get()).hasSize(1);
231233

232234
// test whether the user have authorization to operate create and drop acls.
233-
FlussPrincipal user1 = new FlussPrincipal("user1", "USER");
235+
FlussPrincipal user1 = new FlussPrincipal("user1", "User");
234236
AclBinding user1AclBinding =
235237
new AclBinding(
236238
Resource.table("test_db", "test_table"),
@@ -684,10 +686,14 @@ private static Configuration initConfig() {
684686
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));
685687

686688
// set security information.
689+
conf.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl");
690+
conf.setString("security.sasl.enabled.mechanisms", "plain");
687691
conf.setString(
688-
ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:username_password");
689-
conf.setString("security.username_password.credentials", "root:password,guest:password2");
690-
conf.set(ConfigOptions.SUPER_USERS, "USER:root");
692+
"security.sasl.plain.jaas.config",
693+
"com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required "
694+
+ " user_root=\"password\" "
695+
+ " user_guest=\"password2\";");
696+
conf.set(ConfigOptions.SUPER_USERS, "User:root");
691697
conf.set(ConfigOptions.AUTHORIZER_ENABLED, true);
692698
return conf;
693699
}

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,7 @@ public class ConfigOptions {
10921092
"Enable metrics for client. When metrics is enabled, the client "
10931093
+ "will collect metrics and report by the JMX metrics reporter.");
10941094

1095-
public static final ConfigOption<String> CLIENT_MECHANISM =
1095+
public static final ConfigOption<String> CLIENT_SASL_MECHANISM =
10961096
key("client.security.sasl.mechanism")
10971097
.stringType()
10981098
.defaultValue("PLAIN")

fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/authenticator/SaslClientAuthenticator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929

3030
import java.util.Map;
3131

32-
import static com.alibaba.fluss.config.ConfigOptions.CLIENT_MECHANISM;
3332
import static com.alibaba.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_CONFIG;
3433
import static com.alibaba.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_PASSWORD;
3534
import static com.alibaba.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_USERNAME;
35+
import static com.alibaba.fluss.config.ConfigOptions.CLIENT_SASL_MECHANISM;
3636
import static com.alibaba.fluss.security.auth.sasl.jaas.SaslServerFactory.createSaslClient;
3737

3838
/** An authenticator that uses SASL to authenticate with a server. */
@@ -47,7 +47,7 @@ public class SaslClientAuthenticator implements ClientAuthenticator {
4747
private LoginManager loginManager;
4848

4949
public SaslClientAuthenticator(Configuration configuration) {
50-
this.mechanism = configuration.get(CLIENT_MECHANISM).toUpperCase();
50+
this.mechanism = configuration.get(CLIENT_SASL_MECHANISM).toUpperCase();
5151
String jaasConfigStr = configuration.getString(CLIENT_SASL_JAAS_CONFIG);
5252
if (jaasConfigStr == null && mechanism.equals(PlainSaslServer.PLAIN_MECHANISM)) {
5353
String username = configuration.get(CLIENT_SASL_JAAS_USERNAME);

fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,6 @@ public boolean isCompleted() {
147147

148148
@Override
149149
public FlussPrincipal createPrincipal() {
150-
return new FlussPrincipal("User", saslServer.getAuthorizationID());
150+
return new FlussPrincipal(saslServer.getAuthorizationID(), "User");
151151
}
152152
}

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/catalog/FlinkCatalogFactoryTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,10 @@ public void testCreateCatalog() {
6969

7070
// test security configs
7171
Map<String, String> securityMap = new HashMap<>();
72-
securityMap.put(ConfigOptions.CLIENT_SECURITY_PROTOCOL.key(), "username_password");
73-
securityMap.put("client.security.username_password.username", "root");
74-
securityMap.put("client.security.username_password.password", "password");
72+
securityMap.put(ConfigOptions.CLIENT_SECURITY_PROTOCOL.key(), "sasl");
73+
securityMap.put(ConfigOptions.CLIENT_SASL_MECHANISM.key(), "plain");
74+
securityMap.put("client.security.sasl.username", "root");
75+
securityMap.put("client.security.sasl.password", "password");
7576

7677
options.putAll(securityMap);
7778
FlinkCatalog actualCatalog2 =

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/catalog/FlinkCatalogITCase.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -591,9 +591,13 @@ void testCreateTableWithUnknownOptions() {
591591
void testAuthentication() throws Exception {
592592
String clientListenerName = "CLIENT";
593593
Configuration serverConfig = new Configuration();
594+
serverConfig.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl");
595+
serverConfig.setString("security.sasl.enabled.mechanisms", "plain");
594596
serverConfig.setString(
595-
ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:username_password");
596-
serverConfig.setString("security.username_password.credentials", "root:password");
597+
"security.sasl.plain.jaas.config",
598+
"com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required "
599+
+ " user_root=\"password\" "
600+
+ " user_guest=\"password2\";");
597601
serverConfig.setString(ConfigOptions.SUPER_USERS.key(), "USER:root");
598602
FlussClusterExtension flussClusterExtension =
599603
FlussClusterExtension.builder()
@@ -629,9 +633,10 @@ void testAuthentication() throws Exception {
629633
"The connection has not completed authentication yet. This may be caused by a missing or incorrect configuration of 'client.security.protocol' on the client side.");
630634

631635
Map<String, String> clientConfig = new HashMap<>();
632-
clientConfig.put(ConfigOptions.CLIENT_SECURITY_PROTOCOL.key(), "username_password");
633-
clientConfig.put("client.security.username_password.username", "root");
634-
clientConfig.put("client.security.username_password.password", "password");
636+
clientConfig.put(ConfigOptions.CLIENT_SECURITY_PROTOCOL.key(), "sasl");
637+
clientConfig.put(ConfigOptions.CLIENT_SASL_MECHANISM.key(), "plain");
638+
clientConfig.put("client.security.sasl.username", "root");
639+
clientConfig.put("client.security.sasl.password", "password");
635640
authenticateCatalog =
636641
new FlinkCatalog(
637642
CATALOG_NAME,

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/procedure/FlinkProcedureITCase.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,10 @@ void before() throws ExecutionException, InterruptedException {
7575
"create catalog %s with ( \n"
7676
+ "'type' = 'fluss', \n"
7777
+ "'bootstrap.servers' = '%s', \n"
78-
+ "'client.security.protocol' = 'username_password', \n"
79-
+ "'client.security.username_password.username' = 'root', \n"
80-
+ "'client.security.username_password.password' = 'password' \n"
78+
+ "'client.security.protocol' = 'sasl', \n"
79+
+ "'client.security.sasl.mechanism' = 'PLAIN', \n"
80+
+ "'client.security.sasl.username' = 'root', \n"
81+
+ "'client.security.sasl.password' = 'password' \n"
8182
+ ")",
8283
CATALOG_NAME, bootstrapServers);
8384
tEnv.executeSql(catalogDDL).await();
@@ -271,10 +272,14 @@ private static Configuration initConfig() {
271272
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));
272273

273274
// set security information.
275+
conf.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl");
276+
conf.setString("security.sasl.enabled.mechanisms", "plain");
274277
conf.setString(
275-
ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:username_password");
276-
conf.setString("security.username_password.credentials", "root:password,guest:password2");
277-
conf.set(ConfigOptions.SUPER_USERS, "USER:root");
278+
"security.sasl.plain.jaas.config",
279+
"com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required "
280+
+ " user_root=\"password\" "
281+
+ " user_guest=\"password2\";");
282+
conf.set(ConfigOptions.SUPER_USERS, "User:root");
278283
conf.set(ConfigOptions.AUTHORIZER_ENABLED, true);
279284
return conf;
280285
}

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ abstract class FlinkAuthorizationITCase extends AbstractTestBase {
7272
static final String CATALOG_NAME = "testcatalog";
7373
static final String ADMIN_CATALOG_NAME = "test_admin_catalog";
7474
static final String DEFAULT_DB = FlinkCatalogOptions.DEFAULT_DATABASE.defaultValue();
75-
static FlussPrincipal guest = new FlussPrincipal("guest", "USER");
75+
static FlussPrincipal guest = new FlussPrincipal("guest", "User");
7676
static Configuration clientConf;
7777

7878
private TableEnvironment tEnv;
@@ -81,7 +81,6 @@ abstract class FlinkAuthorizationITCase extends AbstractTestBase {
8181
@BeforeAll
8282
static void beforeAll() {
8383
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig("CLIENT");
84-
clientConf.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "username_password");
8584
}
8685

8786
@BeforeEach
@@ -96,9 +95,10 @@ void before() throws ExecutionException, InterruptedException {
9695
"create catalog %s with ( \n"
9796
+ "'type' = 'fluss', \n"
9897
+ "'bootstrap.servers' = '%s', \n"
99-
+ "'client.security.protocol' = 'username_password', \n"
100-
+ "'client.security.username_password.username' = 'guest', \n"
101-
+ "'client.security.username_password.password' = 'password2' \n"
98+
+ "'client.security.protocol' = 'sasl', \n"
99+
+ "'client.security.sasl.mechanism' = 'PLAIN', \n"
100+
+ "'client.security.sasl.username' = 'guest', \n"
101+
+ "'client.security.sasl.password' = 'password2' \n"
102102
+ ")",
103103
CATALOG_NAME, bootstrapServers);
104104
tEnv.executeSql(createCatalogDDL);
@@ -110,9 +110,10 @@ void before() throws ExecutionException, InterruptedException {
110110
"create catalog %s with ( \n"
111111
+ "'type' = 'fluss', \n"
112112
+ "'bootstrap.servers' = '%s', \n"
113-
+ "'client.security.protocol' = 'username_password', \n"
114-
+ "'client.security.username_password.username' = 'root', \n"
115-
+ "'client.security.username_password.password' = 'password' \n"
113+
+ "'client.security.protocol' = 'sasl', \n"
114+
+ "'client.security.sasl.mechanism' = 'PLAIN', \n"
115+
+ "'client.security.sasl.username' = 'root', \n"
116+
+ "'client.security.sasl.password' = 'password' \n"
116117
+ ")",
117118
ADMIN_CATALOG_NAME, bootstrapServers);
118119
tEnv.executeSql(createAminCatalogDDL).await();
@@ -421,10 +422,14 @@ private static Configuration initConfig() {
421422
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));
422423

423424
// set security information.
425+
conf.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl");
426+
conf.setString("security.sasl.enabled.mechanisms", "plain");
424427
conf.setString(
425-
ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:username_password");
426-
conf.setString("security.username_password.credentials", "root:password,guest:password2");
427-
conf.set(ConfigOptions.SUPER_USERS, "USER:root");
428+
"security.sasl.plain.jaas.config",
429+
"com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required "
430+
+ " user_root=\"password\" "
431+
+ " user_guest=\"password2\";");
432+
conf.set(ConfigOptions.SUPER_USERS, "User:root");
428433
conf.set(ConfigOptions.AUTHORIZER_ENABLED, true);
429434
return conf;
430435
}

0 commit comments

Comments
 (0)