Skip to content

Commit d69c8f9

Browse files
[server] Add keep alive method for ServerAuthenticator for each rpc invocation. (#956)
1 parent cb22c69 commit d69c8f9

File tree

4 files changed

+108
-58
lines changed

4 files changed

+108
-58
lines changed

fluss-common/src/main/java/com/alibaba/fluss/security/auth/ServerAuthenticator.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
import com.alibaba.fluss.exception.AuthenticationException;
2121
import com.alibaba.fluss.security.acl.FlussPrincipal;
2222

23+
import java.io.Closeable;
24+
import java.io.IOException;
25+
2326
/**
2427
* Authenticator for server side.
2528
*
2629
* @since 0.7
2730
*/
2831
@PublicEvolving
29-
public interface ServerAuthenticator {
32+
public interface ServerAuthenticator extends Closeable {
3033

3134
String protocol();
3235

@@ -83,6 +86,30 @@ default void initialize(AuthenticateContext context) {}
8386
*/
8487
FlussPrincipal createPrincipal();
8588

89+
/**
90+
* Performs a lightweight authentication check during each RPC invocation.
91+
*
92+
* <p>For example, this method serves some purposes:
93+
*
94+
* <ul>
95+
* <li>Verifies that the current authentication state remains valid (e.g., session not
96+
* expired, token still active).
97+
* <li>Optionally refreshes or extends the session to prevent expiration.
98+
* </ul>
99+
*
100+
* <p>Implementations should ensure this method is non-blocking and efficient, as it may be
101+
* invoked on every RPC call. A caching strategy is recommended if remote or expensive
102+
* operations are involved.
103+
*
104+
* @throws AuthenticationException if the authentication has expired or become invalid
105+
*/
106+
default void keepAlive(short apiKey) throws AuthenticationException {}
107+
108+
/** Close the authenticator. */
109+
default void close() throws IOException {}
110+
86111
/** The context of the authentication process. */
87-
interface AuthenticateContext {}
112+
interface AuthenticateContext {
113+
String ipAddress();
114+
}
88115
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServerHandler.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.alibaba.fluss.shaded.netty4.io.netty.handler.timeout.IdleState;
4040
import com.alibaba.fluss.shaded.netty4.io.netty.handler.timeout.IdleStateEvent;
4141
import com.alibaba.fluss.utils.ExceptionUtils;
42+
import com.alibaba.fluss.utils.IOUtils;
4243

4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
@@ -139,6 +140,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
139140
// 3. the channel is complete, but receive auth request (PLAINTEXT case)
140141
handleAuthenticateRequest(apiKey, requestMessage, future);
141142
} else {
143+
if (state.isReady()) {
144+
authenticator.keepAlive(apiKey);
145+
}
142146
requestChannel.putRequest(request);
143147
}
144148

@@ -175,6 +179,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
175179
@Override
176180
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
177181
super.channelInactive(ctx);
182+
close();
178183
}
179184

180185
@Override
@@ -183,7 +188,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
183188
IdleStateEvent event = (IdleStateEvent) evt;
184189
if (event.state().equals(IdleState.ALL_IDLE)) {
185190
LOG.warn("Connection {} is idle, closing...", ctx.channel().remoteAddress());
186-
ctx.close();
191+
close();
187192
}
188193
}
189194
}
@@ -207,6 +212,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
207212

208213
private void close() {
209214
switchState(ConnectionState.CLOSE);
215+
IOUtils.closeQuietly(authenticator);
210216
ctx.close();
211217
}
212218

@@ -363,8 +369,18 @@ public boolean isActive() {
363369
public boolean isAuthenticating() {
364370
return this == AUTHENTICATING;
365371
}
372+
373+
public boolean isReady() {
374+
return this == READY;
375+
}
366376
}
367377

368-
private static class DefaultAuthenticateContext
369-
implements ServerAuthenticator.AuthenticateContext {}
378+
private class DefaultAuthenticateContext implements ServerAuthenticator.AuthenticateContext {
379+
@Override
380+
public String ipAddress() {
381+
return ((InetSocketAddress) ctx.channel().remoteAddress())
382+
.getAddress()
383+
.getHostAddress();
384+
}
385+
}
370386
}

fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/authenticate/AuthenticationTest.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public void cleanup() throws Exception {
6262
if (nettyServer != null) {
6363
nettyServer.close();
6464
}
65+
MutualAuthenticationPlugin.errorType = MutualAuthenticationPlugin.ErrorType.NONE;
6566
}
6667

6768
@Test
@@ -88,7 +89,8 @@ void testMutualAuthenticate() throws Exception {
8889
}
8990

9091
// test invalid challenge from server
91-
clientConfig.setString("client.security.mutual.error-type", "SERVER_ERROR_CHALLENGE");
92+
MutualAuthenticationPlugin.errorType =
93+
MutualAuthenticationPlugin.ErrorType.SERVER_ERROR_CHALLENGE;
9294
try (NettyClient nettyClient =
9395
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
9496
assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, mutualAuthServerNode))
@@ -98,7 +100,8 @@ void testMutualAuthenticate() throws Exception {
98100
}
99101

100102
// test invalid token from client
101-
clientConfig.setString("client.security.mutual.error-type", "CLIENT_ERROR_SECOND_TOKEN");
103+
MutualAuthenticationPlugin.errorType =
104+
MutualAuthenticationPlugin.ErrorType.CLIENT_ERROR_SECOND_TOKE;
102105
try (NettyClient nettyClient =
103106
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
104107
assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, mutualAuthServerNode))
@@ -111,7 +114,8 @@ void testMutualAuthenticate() throws Exception {
111114
void testNoChallengeBeforeClientComplete() throws Exception {
112115
Configuration clientConfig = new Configuration();
113116
clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
114-
clientConfig.setString("client.security.mutual.error-type", "SERVER_NO_CHALLENGE");
117+
MutualAuthenticationPlugin.errorType =
118+
MutualAuthenticationPlugin.ErrorType.SERVER_NO_CHALLENGE;
115119
try (NettyClient nettyClient =
116120
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
117121

@@ -126,7 +130,8 @@ void testNoChallengeBeforeClientComplete() throws Exception {
126130
void testRetirableAuthenticateException() throws Exception {
127131
Configuration clientConfig = new Configuration();
128132
clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
129-
clientConfig.setString("client.security.mutual.error-type", "RETRIABLE_EXCEPTION");
133+
MutualAuthenticationPlugin.errorType =
134+
MutualAuthenticationPlugin.ErrorType.RETRIABLE_EXCEPTION;
130135
try (NettyClient nettyClient =
131136
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
132137
verifyGetTableNamesList(nettyClient, mutualAuthServerNode);
@@ -203,6 +208,22 @@ void testMultiClientsWithSameProtocol() throws Exception {
203208
}
204209
}
205210

211+
@Test
212+
void testKeepAliveError() throws Exception {
213+
Configuration clientConfig = new Configuration();
214+
clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
215+
try (NettyClient nettyClient =
216+
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
217+
verifyGetTableNamesList(nettyClient, mutualAuthServerNode);
218+
MutualAuthenticationPlugin.errorType =
219+
MutualAuthenticationPlugin.ErrorType.KEEP_ALIVE_ERROR;
220+
assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, mutualAuthServerNode))
221+
.hasRootCauseExactlyInstanceOf(AuthenticationException.class)
222+
.rootCause()
223+
.hasMessageContaining("Keep alive error");
224+
}
225+
}
226+
206227
private void verifyGetTableNamesList(NettyClient nettyClient, ServerNode serverNode)
207228
throws Exception {
208229
ListTablesRequest request = new ListTablesRequest().setDatabaseName("test-database");

fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/authenticate/MutualAuthenticationPlugin.java

Lines changed: 35 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.alibaba.fluss.rpc.netty.authenticate;
1818

19-
import com.alibaba.fluss.config.ConfigOption;
2019
import com.alibaba.fluss.config.Configuration;
2120
import com.alibaba.fluss.exception.AuthenticationException;
2221
import com.alibaba.fluss.exception.RetriableAuthenticationException;
@@ -26,25 +25,22 @@
2625
import com.alibaba.fluss.security.auth.ServerAuthenticationPlugin;
2726
import com.alibaba.fluss.security.auth.ServerAuthenticator;
2827

28+
import java.io.IOException;
2929
import java.util.concurrent.ThreadLocalRandom;
3030

31-
import static com.alibaba.fluss.config.ConfigBuilder.key;
32-
3331
/**
3432
* An {@link com.alibaba.fluss.security.auth.AuthenticationPlugin} to mock mutual authentication.
3533
*/
3634
public class MutualAuthenticationPlugin
3735
implements ServerAuthenticationPlugin, ClientAuthenticationPlugin {
3836

3937
private static final String MUTUAL_AUTH_PROTOCOL = "mutual";
40-
private static final ConfigOption<ErrorType> ERROR_TYPE =
41-
key("client.security.mutual.error-type")
42-
.enumType(ErrorType.class)
43-
.defaultValue(ErrorType.NONE);
38+
39+
public static ErrorType errorType = ErrorType.NONE;
4440

4541
@Override
4642
public ClientAuthenticator createClientAuthenticator(Configuration configuration) {
47-
return new ClientAuthenticatorImpl(configuration);
43+
return new ClientAuthenticatorImpl();
4844
}
4945

5046
@Override
@@ -67,11 +63,6 @@ private enum Status {
6763

6864
private Status status;
6965
Integer initialSalt;
70-
private final int errorType;
71-
72-
public ClientAuthenticatorImpl(Configuration configuration) {
73-
this.errorType = configuration.get(ERROR_TYPE).code;
74-
}
7566

7667
@Override
7768
public String protocol() {
@@ -88,23 +79,16 @@ public void initialize(AuthenticateContext context) {
8879
public byte[] authenticate(byte[] data) throws AuthenticationException {
8980
switch (status) {
9081
case SEND_CLIENT_FIRST_MESSAGE:
91-
initialSalt =
92-
isError(
93-
errorType,
94-
ErrorType.SERVER_NO_CHALLENGE,
95-
ErrorType.SERVER_ERROR_CHALLENGE,
96-
ErrorType.RETRIABLE_EXCEPTION)
97-
? errorType
98-
: generateInitialSalt();
82+
initialSalt = generateInitialSalt();
9983
status = Status.RECEIVE_SERVER_FIRST_MESSAGE;
10084
return String.valueOf(initialSalt).getBytes();
10185
case RECEIVE_SERVER_FIRST_MESSAGE:
10286
int challenge = parseToken(data);
10387
if (challenge == initialSalt + 1) {
10488
status = Status.RECEIVE_SERVER_FINAL_MESSAGE;
10589
return String.valueOf(
106-
isError(errorType, ErrorType.CLIENT_ERROR_SECOND_TOKEN)
107-
? errorType
90+
errorType == ErrorType.CLIENT_ERROR_SECOND_TOKE
91+
? -1
10892
: challenge + 1)
10993
.getBytes();
11094
} else {
@@ -138,12 +122,14 @@ private static class ServerAuthenticatorImpl implements ServerAuthenticator {
138122
private enum Status {
139123
RECEIVE_CLIENT_FIRST_MESSAGE,
140124
RECEIVE_CLIENT_FINAL_MESSAGE,
141-
COMPLETED
125+
COMPLETED,
126+
CLOSED
142127
}
143128

144129
private Integer initialSalt;
145130
private int retryNumber = 0;
146131
private Status status = Status.RECEIVE_CLIENT_FIRST_MESSAGE;
132+
private String ip;
147133

148134
@Override
149135
public String protocol() {
@@ -153,6 +139,7 @@ public String protocol() {
153139
@Override
154140
public void initialize(AuthenticateContext context) {
155141
this.status = Status.RECEIVE_CLIENT_FIRST_MESSAGE;
142+
this.ip = context.ipAddress();
156143
this.initialSalt = null;
157144
}
158145

@@ -161,14 +148,14 @@ public byte[] evaluateResponse(byte[] token) throws AuthenticationException {
161148
int tokenValue = parseToken(token);
162149
switch (status) {
163150
case RECEIVE_CLIENT_FIRST_MESSAGE:
164-
if (isError(tokenValue, ErrorType.SERVER_NO_CHALLENGE)) {
151+
if (errorType == ErrorType.SERVER_NO_CHALLENGE) {
165152
return null;
166153
}
167-
if (isError(tokenValue, ErrorType.SERVER_ERROR_CHALLENGE)) {
154+
if (errorType == ErrorType.SERVER_ERROR_CHALLENGE) {
168155
return "-1".getBytes();
169156
}
170-
if (isError(tokenValue, ErrorType.RETRIABLE_EXCEPTION) && retryNumber++ < 3) {
171-
throw new RetriableAuthenticationException("Retriable exception");
157+
if (errorType == ErrorType.RETRIABLE_EXCEPTION && retryNumber++ < 3) {
158+
throw new RetriableAuthenticationException("Retriable exception " + ip);
172159
}
173160

174161
initialSalt = tokenValue + 1;
@@ -199,6 +186,18 @@ public FlussPrincipal createPrincipal() {
199186
public boolean isCompleted() {
200187
return status == Status.COMPLETED;
201188
}
189+
190+
@Override
191+
public void keepAlive(short apiKey) throws AuthenticationException {
192+
if (errorType == ErrorType.KEEP_ALIVE_ERROR) {
193+
throw new AuthenticationException("Keep alive error");
194+
}
195+
}
196+
197+
@Override
198+
public void close() throws IOException {
199+
status = Status.CLOSED;
200+
}
202201
}
203202

204203
private static int parseToken(byte[] token) {
@@ -212,26 +211,13 @@ private static int generateInitialSalt() {
212211
return ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE);
213212
}
214213

215-
private static boolean isError(int errorType, ErrorType... errorTypes) {
216-
for (ErrorType type : errorTypes) {
217-
if (errorType == type.code) {
218-
return true;
219-
}
220-
}
221-
return false;
222-
}
223-
224-
enum ErrorType {
225-
NONE(-1),
226-
SERVER_NO_CHALLENGE(-2),
227-
SERVER_ERROR_CHALLENGE(-3),
228-
CLIENT_ERROR_SECOND_TOKEN(-4),
229-
RETRIABLE_EXCEPTION(-5);
230-
231-
final int code;
232-
233-
ErrorType(int code) {
234-
this.code = code;
235-
}
214+
/** Error types for testing. */
215+
public enum ErrorType {
216+
NONE,
217+
SERVER_NO_CHALLENGE,
218+
SERVER_ERROR_CHALLENGE,
219+
CLIENT_ERROR_SECOND_TOKE,
220+
RETRIABLE_EXCEPTION,
221+
KEEP_ALIVE_ERROR
236222
}
237223
}

0 commit comments

Comments
 (0)