Skip to content

Commit 14fc92c

Browse files
committed
[server] Add keep alive method for ServerAuthenticator for each rpc invocation.
1 parent a648cac commit 14fc92c

File tree

4 files changed

+99
-57
lines changed

4 files changed

+99
-57
lines changed

fluss-common/src/main/java/com/alibaba/fluss/security/auth/ServerAuthenticator.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
import com.alibaba.fluss.exception.AuthenticationException;
2121
import com.alibaba.fluss.security.acl.FlussPrincipal;
2222

23+
import java.io.Closeable;
24+
import java.io.IOException;
25+
2326
/**
2427
* Authenticator for server side.
2528
*
2629
* @since 0.7
2730
*/
2831
@PublicEvolving
29-
public interface ServerAuthenticator {
32+
public interface ServerAuthenticator extends Closeable {
3033

3134
String protocol();
3235

@@ -83,6 +86,30 @@ default void initialize(AuthenticateContext context) {}
8386
*/
8487
FlussPrincipal createPrincipal();
8588

89+
/**
90+
* Performs a lightweight authentication check during each RPC invocation.
91+
*
92+
* <p>For example, this method serves some purposes:
93+
*
94+
* <ul>
95+
* <li>Verifies that the current authentication state remains valid (e.g., session not
96+
* expired, token still active).
97+
* <li>Optionally refreshes or extends the session to prevent expiration.
98+
* </ul>
99+
*
100+
* <p>Implementations should ensure this method is non-blocking and efficient, as it may be
101+
* invoked on every RPC call. A caching strategy is recommended if remote or expensive
102+
* operations are involved.
103+
*
104+
* @throws AuthenticationException if the authentication has expired or become invalid
105+
*/
106+
default void keepAlive(short apiKey) throws AuthenticationException {}
107+
108+
/** Close the authenticator. */
109+
default void close() throws IOException {}
110+
86111
/** The context of the authentication process. */
87-
interface AuthenticateContext {}
112+
interface AuthenticateContext {
113+
String ipAddress();
114+
}
88115
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServerHandler.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.alibaba.fluss.shaded.netty4.io.netty.handler.timeout.IdleState;
4040
import com.alibaba.fluss.shaded.netty4.io.netty.handler.timeout.IdleStateEvent;
4141
import com.alibaba.fluss.utils.ExceptionUtils;
42+
import com.alibaba.fluss.utils.IOUtils;
4243

4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
@@ -139,6 +140,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
139140
// 3. the channel is complete, but receive auth request (PLAINTEXT case)
140141
handleAuthenticateRequest(apiKey, requestMessage, future);
141142
} else {
143+
if (state.isReady()) {
144+
authenticator.keepAlive(apiKey);
145+
}
142146
requestChannel.putRequest(request);
143147
}
144148

@@ -183,7 +187,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
183187
IdleStateEvent event = (IdleStateEvent) evt;
184188
if (event.state().equals(IdleState.ALL_IDLE)) {
185189
LOG.warn("Connection {} is idle, closing...", ctx.channel().remoteAddress());
186-
ctx.close();
190+
close();
187191
}
188192
}
189193
}
@@ -207,6 +211,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
207211

208212
private void close() {
209213
switchState(ConnectionState.CLOSE);
214+
IOUtils.closeQuietly(authenticator);
210215
ctx.close();
211216
}
212217

@@ -363,8 +368,18 @@ public boolean isActive() {
363368
public boolean isAuthenticating() {
364369
return this == AUTHENTICATING;
365370
}
371+
372+
public boolean isReady() {
373+
return this == READY;
374+
}
366375
}
367376

368-
private static class DefaultAuthenticateContext
369-
implements ServerAuthenticator.AuthenticateContext {}
377+
private class DefaultAuthenticateContext implements ServerAuthenticator.AuthenticateContext {
378+
@Override
379+
public String ipAddress() {
380+
return ((InetSocketAddress) ctx.channel().remoteAddress())
381+
.getAddress()
382+
.getHostAddress();
383+
}
384+
}
370385
}

fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/authenticate/AuthenticationTest.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public void cleanup() throws Exception {
6262
if (nettyServer != null) {
6363
nettyServer.close();
6464
}
65+
MutualAuthenticationPlugin.errorType = MutualAuthenticationPlugin.ErrorType.NONE;
6566
}
6667

6768
@Test
@@ -88,7 +89,8 @@ void testMutualAuthenticate() throws Exception {
8889
}
8990

9091
// test invalid challenge from server
91-
clientConfig.setString("client.security.mutual.error-type", "SERVER_ERROR_CHALLENGE");
92+
MutualAuthenticationPlugin.errorType =
93+
MutualAuthenticationPlugin.ErrorType.SERVER_ERROR_CHALLENGE;
9294
try (NettyClient nettyClient =
9395
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
9496
assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, mutualAuthServerNode))
@@ -98,7 +100,8 @@ void testMutualAuthenticate() throws Exception {
98100
}
99101

100102
// test invalid token from client
101-
clientConfig.setString("client.security.mutual.error-type", "CLIENT_ERROR_SECOND_TOKEN");
103+
MutualAuthenticationPlugin.errorType =
104+
MutualAuthenticationPlugin.ErrorType.CLIENT_ERROR_SECOND_TOKE;
102105
try (NettyClient nettyClient =
103106
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
104107
assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, mutualAuthServerNode))
@@ -111,7 +114,8 @@ void testMutualAuthenticate() throws Exception {
111114
void testNoChallengeBeforeClientComplete() throws Exception {
112115
Configuration clientConfig = new Configuration();
113116
clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
114-
clientConfig.setString("client.security.mutual.error-type", "SERVER_NO_CHALLENGE");
117+
MutualAuthenticationPlugin.errorType =
118+
MutualAuthenticationPlugin.ErrorType.SERVER_NO_CHALLENGE;
115119
try (NettyClient nettyClient =
116120
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
117121

@@ -126,7 +130,8 @@ void testNoChallengeBeforeClientComplete() throws Exception {
126130
void testRetirableAuthenticateException() throws Exception {
127131
Configuration clientConfig = new Configuration();
128132
clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
129-
clientConfig.setString("client.security.mutual.error-type", "RETRIABLE_EXCEPTION");
133+
MutualAuthenticationPlugin.errorType =
134+
MutualAuthenticationPlugin.ErrorType.RETRIABLE_EXCEPTION;
130135
try (NettyClient nettyClient =
131136
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
132137
verifyGetTableNamesList(nettyClient, mutualAuthServerNode);
@@ -203,6 +208,22 @@ void testMultiClientsWithSameProtocol() throws Exception {
203208
}
204209
}
205210

211+
@Test
212+
void testKeepAliveError() throws Exception {
213+
Configuration clientConfig = new Configuration();
214+
clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
215+
try (NettyClient nettyClient =
216+
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
217+
verifyGetTableNamesList(nettyClient, mutualAuthServerNode);
218+
MutualAuthenticationPlugin.errorType =
219+
MutualAuthenticationPlugin.ErrorType.KEEP_ALIVE_ERROR;
220+
assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, mutualAuthServerNode))
221+
.hasRootCauseExactlyInstanceOf(AuthenticationException.class)
222+
.rootCause()
223+
.hasMessageContaining("Keep alive error");
224+
}
225+
}
226+
206227
private void verifyGetTableNamesList(NettyClient nettyClient, ServerNode serverNode)
207228
throws Exception {
208229
ListTablesRequest request = new ListTablesRequest().setDatabaseName("test-database");

fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/authenticate/MutualAuthenticationPlugin.java

Lines changed: 27 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.alibaba.fluss.rpc.netty.authenticate;
1818

19-
import com.alibaba.fluss.config.ConfigOption;
2019
import com.alibaba.fluss.config.Configuration;
2120
import com.alibaba.fluss.exception.AuthenticationException;
2221
import com.alibaba.fluss.exception.RetriableAuthenticationException;
@@ -28,23 +27,19 @@
2827

2928
import java.util.concurrent.ThreadLocalRandom;
3029

31-
import static com.alibaba.fluss.config.ConfigBuilder.key;
32-
3330
/**
3431
* An {@link com.alibaba.fluss.security.auth.AuthenticationPlugin} to mock mutual authentication.
3532
*/
3633
public class MutualAuthenticationPlugin
3734
implements ServerAuthenticationPlugin, ClientAuthenticationPlugin {
3835

3936
private static final String MUTUAL_AUTH_PROTOCOL = "mutual";
40-
private static final ConfigOption<ErrorType> ERROR_TYPE =
41-
key("client.security.mutual.error-type")
42-
.enumType(ErrorType.class)
43-
.defaultValue(ErrorType.NONE);
37+
38+
public static ErrorType errorType = ErrorType.NONE;
4439

4540
@Override
4641
public ClientAuthenticator createClientAuthenticator(Configuration configuration) {
47-
return new ClientAuthenticatorImpl(configuration);
42+
return new ClientAuthenticatorImpl();
4843
}
4944

5045
@Override
@@ -67,11 +62,6 @@ private enum Status {
6762

6863
private Status status;
6964
Integer initialSalt;
70-
private final int errorType;
71-
72-
public ClientAuthenticatorImpl(Configuration configuration) {
73-
this.errorType = configuration.get(ERROR_TYPE).code;
74-
}
7565

7666
@Override
7767
public String protocol() {
@@ -88,23 +78,16 @@ public void initialize(AuthenticateContext context) {
8878
public byte[] authenticate(byte[] data) throws AuthenticationException {
8979
switch (status) {
9080
case SEND_CLIENT_FIRST_MESSAGE:
91-
initialSalt =
92-
isError(
93-
errorType,
94-
ErrorType.SERVER_NO_CHALLENGE,
95-
ErrorType.SERVER_ERROR_CHALLENGE,
96-
ErrorType.RETRIABLE_EXCEPTION)
97-
? errorType
98-
: generateInitialSalt();
81+
initialSalt = generateInitialSalt();
9982
status = Status.RECEIVE_SERVER_FIRST_MESSAGE;
10083
return String.valueOf(initialSalt).getBytes();
10184
case RECEIVE_SERVER_FIRST_MESSAGE:
10285
int challenge = parseToken(data);
10386
if (challenge == initialSalt + 1) {
10487
status = Status.RECEIVE_SERVER_FINAL_MESSAGE;
10588
return String.valueOf(
106-
isError(errorType, ErrorType.CLIENT_ERROR_SECOND_TOKEN)
107-
? errorType
89+
errorType == ErrorType.CLIENT_ERROR_SECOND_TOKE
90+
? -1
10891
: challenge + 1)
10992
.getBytes();
11093
} else {
@@ -144,6 +127,7 @@ private enum Status {
144127
private Integer initialSalt;
145128
private int retryNumber = 0;
146129
private Status status = Status.RECEIVE_CLIENT_FIRST_MESSAGE;
130+
private String ip;
147131

148132
@Override
149133
public String protocol() {
@@ -153,6 +137,7 @@ public String protocol() {
153137
@Override
154138
public void initialize(AuthenticateContext context) {
155139
this.status = Status.RECEIVE_CLIENT_FIRST_MESSAGE;
140+
this.ip = context.ipAddress();
156141
this.initialSalt = null;
157142
}
158143

@@ -161,14 +146,14 @@ public byte[] evaluateResponse(byte[] token) throws AuthenticationException {
161146
int tokenValue = parseToken(token);
162147
switch (status) {
163148
case RECEIVE_CLIENT_FIRST_MESSAGE:
164-
if (isError(tokenValue, ErrorType.SERVER_NO_CHALLENGE)) {
149+
if (errorType == ErrorType.SERVER_NO_CHALLENGE) {
165150
return null;
166151
}
167-
if (isError(tokenValue, ErrorType.SERVER_ERROR_CHALLENGE)) {
152+
if (errorType == ErrorType.SERVER_ERROR_CHALLENGE) {
168153
return "-1".getBytes();
169154
}
170-
if (isError(tokenValue, ErrorType.RETRIABLE_EXCEPTION) && retryNumber++ < 3) {
171-
throw new RetriableAuthenticationException("Retriable exception");
155+
if (errorType == ErrorType.RETRIABLE_EXCEPTION && retryNumber++ < 3) {
156+
throw new RetriableAuthenticationException("Retriable exception " + ip);
172157
}
173158

174159
initialSalt = tokenValue + 1;
@@ -199,6 +184,13 @@ public FlussPrincipal createPrincipal() {
199184
public boolean isCompleted() {
200185
return status == Status.COMPLETED;
201186
}
187+
188+
@Override
189+
public void keepAlive(short apiKey) throws AuthenticationException {
190+
if (errorType == ErrorType.KEEP_ALIVE_ERROR) {
191+
throw new AuthenticationException("Keep alive error");
192+
}
193+
}
202194
}
203195

204196
private static int parseToken(byte[] token) {
@@ -212,26 +204,13 @@ private static int generateInitialSalt() {
212204
return ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE);
213205
}
214206

215-
private static boolean isError(int errorType, ErrorType... errorTypes) {
216-
for (ErrorType type : errorTypes) {
217-
if (errorType == type.code) {
218-
return true;
219-
}
220-
}
221-
return false;
222-
}
223-
224-
enum ErrorType {
225-
NONE(-1),
226-
SERVER_NO_CHALLENGE(-2),
227-
SERVER_ERROR_CHALLENGE(-3),
228-
CLIENT_ERROR_SECOND_TOKEN(-4),
229-
RETRIABLE_EXCEPTION(-5);
230-
231-
final int code;
232-
233-
ErrorType(int code) {
234-
this.code = code;
235-
}
207+
/** Error types for testing. */
208+
public enum ErrorType {
209+
NONE,
210+
SERVER_NO_CHALLENGE,
211+
SERVER_ERROR_CHALLENGE,
212+
CLIENT_ERROR_SECOND_TOKE,
213+
RETRIABLE_EXCEPTION,
214+
KEEP_ALIVE_ERROR
236215
}
237216
}

0 commit comments

Comments
 (0)