Skip to content

Commit 0823598

Browse files
[server][auth] Support SASL/PLAIN authentication. (#985)
1 parent f2d4951 commit 0823598

33 files changed

+2926
-30
lines changed

LICENSE

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,19 @@ Apache Kafka
312312
./fluss-common/src/main/java/com/alibaba/fluss/utils/log/ByteBufferUnmapper.java
313313
./fluss-common/src/main/java/com/alibaba/fluss/utils/log/FairBucketStatusMap.java
314314
./fluss-common/src/main/java/com/alibaba/fluss/utils/ExponentialBackoff.java
315+
./fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/AuthenticateCallbackHandler.java
316+
./fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/DefaultLogin.java
317+
./fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/JaasConfig.java
318+
./fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/JaasContext.java
319+
./fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/Login.java
320+
./fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/LoginManager.java
321+
./fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/SaslClientCallbackHandler.java
322+
./fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/SaslServerFactory.java
323+
./fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/plain/PlainAuthenticateCallback.java
324+
./fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/plain/PlainLoginModule.java
325+
./fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/plain/PlainSaslServer.java
326+
./fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/plain/PlainSaslServerProvider.java
327+
./fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/plain/PlainServerCallbackHandler.java
315328
./fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
316329
./fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
317330
./fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import static com.alibaba.fluss.config.ConfigOptions.InfoLogLevel.INFO_LEVEL;
4141
import static com.alibaba.fluss.config.ConfigOptions.NoKeyAssigner.ROUND_ROBIN;
4242
import static com.alibaba.fluss.config.ConfigOptions.NoKeyAssigner.STICKY;
43+
import static com.alibaba.fluss.security.auth.sasl.jaas.JaasContext.SASL_JAAS_CONFIG;
4344

4445
/**
4546
* Config options for Fluss.
@@ -177,6 +178,9 @@ public class ConfigOptions {
177178
"The maximum number of buckets that can be created for a table."
178179
+ "The default value is 128000");
179180

181+
public static final ConfigOption<List<String>> SERVER_SASL_ENABLED_MECHANISMS_CONFIG =
182+
key("security.sasl.enabled.mechanisms").stringType().asList().noDefaultValue();
183+
180184
// ------------------------------------------------------------------------
181185
// ConfigOptions for Coordinator Server
182186
// ------------------------------------------------------------------------
@@ -1089,6 +1093,21 @@ public class ConfigOptions {
10891093
"Enable metrics for client. When metrics is enabled, the client "
10901094
+ "will collect metrics and report by the JMX metrics reporter.");
10911095

1096+
public static final ConfigOption<String> CLIENT_MECHANISM =
1097+
key("client.security.sasl.mechanism")
1098+
.stringType()
1099+
.noDefaultValue()
1100+
.withDescription(
1101+
"SASL mechanism to use for authentication.Currently, we only support plain.");
1102+
1103+
public static final ConfigOption<String> CLIENT_SASL_JAAS_CONFIG =
1104+
key("client.security.sasl." + SASL_JAAS_CONFIG)
1105+
.stringType()
1106+
.noDefaultValue()
1107+
.withDescription(
1108+
"JAAS configuration string for the client. If not provided, uses the JVM option -Djava.security.auth.login.config. \n"
1109+
+ "Example: com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\")");
1110+
10921111
// ------------------------------------------------------------------------
10931112
// ConfigOptions for Fluss Table
10941113
// ------------------------------------------------------------------------

fluss-common/src/main/java/com/alibaba/fluss/security/auth/AuthenticationFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@
5656
*/
5757
@PublicEvolving
5858
public class AuthenticationFactory {
59-
private static final String SERVER_AUTHENTICATOR_PREFIX = "security.";
60-
6159
/**
6260
* Loads a supplier for a client authenticator based on the configuration.
6361
*

fluss-common/src/main/java/com/alibaba/fluss/security/auth/ClientAuthenticator.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,34 @@
2121

2222
import javax.annotation.Nullable;
2323

24+
import java.io.Closeable;
25+
2426
/** Authenticator for client side. */
2527
@PublicEvolving
26-
public interface ClientAuthenticator {
28+
public interface ClientAuthenticator extends Closeable {
2729

2830
/** The protocol name of the authenticator, which will send in the AuthenticateRequest. */
2931
String protocol();
3032

3133
/** Initialize the authenticator. */
32-
default void initialize(AuthenticateContext context) {}
34+
default void initialize(AuthenticateContext context) throws AuthenticationException {}
35+
36+
/**
37+
* Determines whether the client authenticator should proactively send an initial token to the
38+
* server.
39+
*
40+
* <p>When this method returns {@code true}, it indicates that the client is the initiator of
41+
* the authentication exchange and should actively call {@link #authenticate(byte[])
42+
* authenticate(new byte[0])} to generate and send the initial token without waiting for a
43+
* challenge from the server.
44+
*
45+
* @return {@code true} if the client should initiate authentication by sending an initial
46+
* token; {@code false} if the client expects to receive the first token or challenge from
47+
* the server.
48+
*/
49+
default boolean hasInitialTokenResponse() {
50+
return true;
51+
}
3352

3453
/**
3554
* * Generates the initial token or calculates a token based on the server's challenge, then
@@ -79,6 +98,10 @@ default void initialize(AuthenticateContext context) {}
7998
/** Checks if the authentication from client side is completed. */
8099
boolean isCompleted();
81100

101+
default void close() {}
102+
82103
/** The context of the authentication process. */
83-
interface AuthenticateContext {}
104+
interface AuthenticateContext {
105+
String ipAddress();
106+
}
84107
}

fluss-common/src/main/java/com/alibaba/fluss/security/auth/ServerAuthenticator.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,28 @@
2020
import com.alibaba.fluss.exception.AuthenticationException;
2121
import com.alibaba.fluss.security.acl.FlussPrincipal;
2222

23+
import java.io.Closeable;
24+
import java.io.IOException;
25+
2326
/**
2427
* Authenticator for server side.
2528
*
2629
* @since 0.7
2730
*/
2831
@PublicEvolving
29-
public interface ServerAuthenticator {
32+
public interface ServerAuthenticator extends Closeable {
3033

3134
String protocol();
3235

36+
default void matchProtocol(String protocol) throws AuthenticationException {
37+
if (!protocol().equalsIgnoreCase(protocol)) {
38+
throw new AuthenticationException(
39+
String.format(
40+
"Authenticate protocol not match: protocol of server is '%s' while protocol of client is '%s'",
41+
protocol(), protocol));
42+
}
43+
}
44+
3345
/** Initialize the authenticator. */
3446
default void initialize(AuthenticateContext context) {}
3547

@@ -83,6 +95,15 @@ default void initialize(AuthenticateContext context) {}
8395
*/
8496
FlussPrincipal createPrincipal();
8597

98+
/** Close the authenticator. */
99+
default void close() throws IOException {}
100+
86101
/** The context of the authentication process. */
87-
interface AuthenticateContext {}
102+
interface AuthenticateContext {
103+
String ipAddress();
104+
105+
String listenerName();
106+
107+
String protocol();
108+
}
88109
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.security.auth.sasl.authenticator;
18+
19+
import com.alibaba.fluss.config.Configuration;
20+
import com.alibaba.fluss.security.auth.ClientAuthenticationPlugin;
21+
import com.alibaba.fluss.security.auth.ClientAuthenticator;
22+
import com.alibaba.fluss.security.auth.ServerAuthenticationPlugin;
23+
import com.alibaba.fluss.security.auth.ServerAuthenticator;
24+
25+
/** Authentication plugin for SASL. */
26+
public class SaslAuthenticationPlugin
27+
implements ClientAuthenticationPlugin, ServerAuthenticationPlugin {
28+
static final String SASL_AUTH_PROTOCOL = "sasl";
29+
30+
@Override
31+
public ClientAuthenticator createClientAuthenticator(Configuration configuration) {
32+
return new SaslClientAuthenticator(configuration);
33+
}
34+
35+
@Override
36+
public ServerAuthenticator createServerAuthenticator(Configuration configuration) {
37+
return new SaslServerAuthenticator(configuration);
38+
}
39+
40+
@Override
41+
public String authProtocol() {
42+
return SASL_AUTH_PROTOCOL;
43+
}
44+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.security.auth.sasl.authenticator;
18+
19+
import com.alibaba.fluss.config.Configuration;
20+
import com.alibaba.fluss.exception.AuthenticationException;
21+
import com.alibaba.fluss.security.auth.ClientAuthenticator;
22+
import com.alibaba.fluss.security.auth.sasl.jaas.JaasContext;
23+
import com.alibaba.fluss.security.auth.sasl.jaas.LoginManager;
24+
25+
import javax.annotation.Nullable;
26+
import javax.security.auth.login.LoginException;
27+
import javax.security.sasl.SaslClient;
28+
29+
import java.util.Map;
30+
31+
import static com.alibaba.fluss.config.ConfigOptions.CLIENT_MECHANISM;
32+
import static com.alibaba.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_CONFIG;
33+
import static com.alibaba.fluss.security.auth.sasl.jaas.SaslServerFactory.createSaslClient;
34+
35+
/** An authenticator that uses SASL to authenticate with a server. */
36+
public class SaslClientAuthenticator implements ClientAuthenticator {
37+
private final String mechanism;
38+
private final Map<String, String> pros;
39+
private final String jaasConfig;
40+
41+
private SaslClient saslClient;
42+
private LoginManager loginManager;
43+
44+
public SaslClientAuthenticator(Configuration configuration) {
45+
this.mechanism = configuration.get(CLIENT_MECHANISM).toUpperCase();
46+
this.jaasConfig = configuration.getString(CLIENT_SASL_JAAS_CONFIG);
47+
this.pros = configuration.toMap();
48+
}
49+
50+
@Override
51+
public String protocol() {
52+
return mechanism;
53+
}
54+
55+
@Nullable
56+
@Override
57+
public byte[] authenticate(byte[] data) throws AuthenticationException {
58+
try {
59+
return saslClient.evaluateChallenge(data);
60+
} catch (Exception e) {
61+
throw new AuthenticationException("Failed to evaluate SASL challenge", e);
62+
}
63+
}
64+
65+
@Override
66+
public boolean isCompleted() {
67+
return saslClient.isComplete();
68+
}
69+
70+
@Override
71+
public boolean hasInitialTokenResponse() {
72+
return saslClient.hasInitialResponse();
73+
}
74+
75+
@Override
76+
public void initialize(AuthenticateContext context) throws AuthenticationException {
77+
String hostAddress = context.ipAddress();
78+
JaasContext jaasContext = JaasContext.loadClientContext(jaasConfig);
79+
80+
try {
81+
loginManager = LoginManager.acquireLoginManager(jaasContext);
82+
} catch (LoginException exception) {
83+
throw new AuthenticationException("Failed to load login manager", exception);
84+
}
85+
86+
try {
87+
saslClient = createSaslClient(mechanism, hostAddress, pros, loginManager);
88+
} catch (Exception e) {
89+
throw new AuthenticationException("Failed to create SASL client", e);
90+
}
91+
92+
if (saslClient == null) {
93+
throw new AuthenticationException(
94+
"Unable to find a matching SASL mechanism for " + mechanism);
95+
}
96+
}
97+
98+
@Override
99+
public void close() {
100+
if (loginManager != null) {
101+
loginManager.release();
102+
}
103+
}
104+
}

0 commit comments

Comments
 (0)