Skip to content

Commit 7aed531

Browse files
committed
[server] Revert keep alive method for ServerAuthenticator for each rpc invocation.
1 parent 84e0c8d commit 7aed531

File tree

4 files changed

+58
-108
lines changed

4 files changed

+58
-108
lines changed

fluss-common/src/main/java/com/alibaba/fluss/security/auth/ServerAuthenticator.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,13 @@
2020
import com.alibaba.fluss.exception.AuthenticationException;
2121
import com.alibaba.fluss.security.acl.FlussPrincipal;
2222

23-
import java.io.Closeable;
24-
import java.io.IOException;
25-
2623
/**
2724
* Authenticator for server side.
2825
*
2926
* @since 0.7
3027
*/
3128
@PublicEvolving
32-
public interface ServerAuthenticator extends Closeable {
29+
public interface ServerAuthenticator {
3330

3431
String protocol();
3532

@@ -86,30 +83,6 @@ default void initialize(AuthenticateContext context) {}
8683
*/
8784
FlussPrincipal createPrincipal();
8885

89-
/**
90-
* Performs a lightweight authentication check during each RPC invocation.
91-
*
92-
* <p>For example, this method serves some purposes:
93-
*
94-
* <ul>
95-
* <li>Verifies that the current authentication state remains valid (e.g., session not
96-
* expired, token still active).
97-
* <li>Optionally refreshes or extends the session to prevent expiration.
98-
* </ul>
99-
*
100-
* <p>Implementations should ensure this method is non-blocking and efficient, as it may be
101-
* invoked on every RPC call. A caching strategy is recommended if remote or expensive
102-
* operations are involved.
103-
*
104-
* @throws AuthenticationException if the authentication has expired or become invalid
105-
*/
106-
default void keepAlive(short apiKey) throws AuthenticationException {}
107-
108-
/** Close the authenticator. */
109-
default void close() throws IOException {}
110-
11186
/** The context of the authentication process. */
112-
interface AuthenticateContext {
113-
String ipAddress();
114-
}
87+
interface AuthenticateContext {}
11588
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServerHandler.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import com.alibaba.fluss.shaded.netty4.io.netty.handler.timeout.IdleState;
4040
import com.alibaba.fluss.shaded.netty4.io.netty.handler.timeout.IdleStateEvent;
4141
import com.alibaba.fluss.utils.ExceptionUtils;
42-
import com.alibaba.fluss.utils.IOUtils;
4342

4443
import org.slf4j.Logger;
4544
import org.slf4j.LoggerFactory;
@@ -140,9 +139,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
140139
// 3. the channel is complete, but receive auth request (PLAINTEXT case)
141140
handleAuthenticateRequest(apiKey, requestMessage, future);
142141
} else {
143-
if (state.isReady()) {
144-
authenticator.keepAlive(apiKey);
145-
}
146142
requestChannel.putRequest(request);
147143
}
148144

@@ -179,7 +175,6 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
179175
@Override
180176
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
181177
super.channelInactive(ctx);
182-
close();
183178
}
184179

185180
@Override
@@ -188,7 +183,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
188183
IdleStateEvent event = (IdleStateEvent) evt;
189184
if (event.state().equals(IdleState.ALL_IDLE)) {
190185
LOG.warn("Connection {} is idle, closing...", ctx.channel().remoteAddress());
191-
close();
186+
ctx.close();
192187
}
193188
}
194189
}
@@ -212,7 +207,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
212207

213208
private void close() {
214209
switchState(ConnectionState.CLOSE);
215-
IOUtils.closeQuietly(authenticator);
216210
ctx.close();
217211
}
218212

@@ -369,18 +363,8 @@ public boolean isActive() {
369363
public boolean isAuthenticating() {
370364
return this == AUTHENTICATING;
371365
}
372-
373-
public boolean isReady() {
374-
return this == READY;
375-
}
376366
}
377367

378-
private class DefaultAuthenticateContext implements ServerAuthenticator.AuthenticateContext {
379-
@Override
380-
public String ipAddress() {
381-
return ((InetSocketAddress) ctx.channel().remoteAddress())
382-
.getAddress()
383-
.getHostAddress();
384-
}
385-
}
368+
private static class DefaultAuthenticateContext
369+
implements ServerAuthenticator.AuthenticateContext {}
386370
}

fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/authenticate/AuthenticationTest.java

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public void cleanup() throws Exception {
6262
if (nettyServer != null) {
6363
nettyServer.close();
6464
}
65-
MutualAuthenticationPlugin.errorType = MutualAuthenticationPlugin.ErrorType.NONE;
6665
}
6766

6867
@Test
@@ -89,8 +88,7 @@ void testMutualAuthenticate() throws Exception {
8988
}
9089

9190
// test invalid challenge from server
92-
MutualAuthenticationPlugin.errorType =
93-
MutualAuthenticationPlugin.ErrorType.SERVER_ERROR_CHALLENGE;
91+
clientConfig.setString("client.security.mutual.error-type", "SERVER_ERROR_CHALLENGE");
9492
try (NettyClient nettyClient =
9593
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
9694
assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, mutualAuthServerNode))
@@ -100,8 +98,7 @@ void testMutualAuthenticate() throws Exception {
10098
}
10199

102100
// test invalid token from client
103-
MutualAuthenticationPlugin.errorType =
104-
MutualAuthenticationPlugin.ErrorType.CLIENT_ERROR_SECOND_TOKE;
101+
clientConfig.setString("client.security.mutual.error-type", "CLIENT_ERROR_SECOND_TOKEN");
105102
try (NettyClient nettyClient =
106103
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
107104
assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, mutualAuthServerNode))
@@ -114,8 +111,7 @@ void testMutualAuthenticate() throws Exception {
114111
void testNoChallengeBeforeClientComplete() throws Exception {
115112
Configuration clientConfig = new Configuration();
116113
clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
117-
MutualAuthenticationPlugin.errorType =
118-
MutualAuthenticationPlugin.ErrorType.SERVER_NO_CHALLENGE;
114+
clientConfig.setString("client.security.mutual.error-type", "SERVER_NO_CHALLENGE");
119115
try (NettyClient nettyClient =
120116
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
121117

@@ -130,8 +126,7 @@ void testNoChallengeBeforeClientComplete() throws Exception {
130126
void testRetirableAuthenticateException() throws Exception {
131127
Configuration clientConfig = new Configuration();
132128
clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
133-
MutualAuthenticationPlugin.errorType =
134-
MutualAuthenticationPlugin.ErrorType.RETRIABLE_EXCEPTION;
129+
clientConfig.setString("client.security.mutual.error-type", "RETRIABLE_EXCEPTION");
135130
try (NettyClient nettyClient =
136131
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
137132
verifyGetTableNamesList(nettyClient, mutualAuthServerNode);
@@ -208,22 +203,6 @@ void testMultiClientsWithSameProtocol() throws Exception {
208203
}
209204
}
210205

211-
@Test
212-
void testKeepAliveError() throws Exception {
213-
Configuration clientConfig = new Configuration();
214-
clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
215-
try (NettyClient nettyClient =
216-
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
217-
verifyGetTableNamesList(nettyClient, mutualAuthServerNode);
218-
MutualAuthenticationPlugin.errorType =
219-
MutualAuthenticationPlugin.ErrorType.KEEP_ALIVE_ERROR;
220-
assertThatThrownBy(() -> verifyGetTableNamesList(nettyClient, mutualAuthServerNode))
221-
.hasRootCauseExactlyInstanceOf(AuthenticationException.class)
222-
.rootCause()
223-
.hasMessageContaining("Keep alive error");
224-
}
225-
}
226-
227206
private void verifyGetTableNamesList(NettyClient nettyClient, ServerNode serverNode)
228207
throws Exception {
229208
ListTablesRequest request = new ListTablesRequest().setDatabaseName("test-database");

fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/authenticate/MutualAuthenticationPlugin.java

Lines changed: 49 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.alibaba.fluss.rpc.netty.authenticate;
1818

19+
import com.alibaba.fluss.config.ConfigOption;
1920
import com.alibaba.fluss.config.Configuration;
2021
import com.alibaba.fluss.exception.AuthenticationException;
2122
import com.alibaba.fluss.exception.RetriableAuthenticationException;
@@ -25,22 +26,25 @@
2526
import com.alibaba.fluss.security.auth.ServerAuthenticationPlugin;
2627
import com.alibaba.fluss.security.auth.ServerAuthenticator;
2728

28-
import java.io.IOException;
2929
import java.util.concurrent.ThreadLocalRandom;
3030

31+
import static com.alibaba.fluss.config.ConfigBuilder.key;
32+
3133
/**
3234
* An {@link com.alibaba.fluss.security.auth.AuthenticationPlugin} to mock mutual authentication.
3335
*/
3436
public class MutualAuthenticationPlugin
3537
implements ServerAuthenticationPlugin, ClientAuthenticationPlugin {
3638

3739
private static final String MUTUAL_AUTH_PROTOCOL = "mutual";
38-
39-
public static ErrorType errorType = ErrorType.NONE;
40+
private static final ConfigOption<ErrorType> ERROR_TYPE =
41+
key("client.security.mutual.error-type")
42+
.enumType(ErrorType.class)
43+
.defaultValue(ErrorType.NONE);
4044

4145
@Override
4246
public ClientAuthenticator createClientAuthenticator(Configuration configuration) {
43-
return new ClientAuthenticatorImpl();
47+
return new ClientAuthenticatorImpl(configuration);
4448
}
4549

4650
@Override
@@ -63,6 +67,11 @@ private enum Status {
6367

6468
private Status status;
6569
Integer initialSalt;
70+
private final int errorType;
71+
72+
public ClientAuthenticatorImpl(Configuration configuration) {
73+
this.errorType = configuration.get(ERROR_TYPE).code;
74+
}
6675

6776
@Override
6877
public String protocol() {
@@ -79,16 +88,23 @@ public void initialize(AuthenticateContext context) {
7988
public byte[] authenticate(byte[] data) throws AuthenticationException {
8089
switch (status) {
8190
case SEND_CLIENT_FIRST_MESSAGE:
82-
initialSalt = generateInitialSalt();
91+
initialSalt =
92+
isError(
93+
errorType,
94+
ErrorType.SERVER_NO_CHALLENGE,
95+
ErrorType.SERVER_ERROR_CHALLENGE,
96+
ErrorType.RETRIABLE_EXCEPTION)
97+
? errorType
98+
: generateInitialSalt();
8399
status = Status.RECEIVE_SERVER_FIRST_MESSAGE;
84100
return String.valueOf(initialSalt).getBytes();
85101
case RECEIVE_SERVER_FIRST_MESSAGE:
86102
int challenge = parseToken(data);
87103
if (challenge == initialSalt + 1) {
88104
status = Status.RECEIVE_SERVER_FINAL_MESSAGE;
89105
return String.valueOf(
90-
errorType == ErrorType.CLIENT_ERROR_SECOND_TOKE
91-
? -1
106+
isError(errorType, ErrorType.CLIENT_ERROR_SECOND_TOKEN)
107+
? errorType
92108
: challenge + 1)
93109
.getBytes();
94110
} else {
@@ -122,14 +138,12 @@ private static class ServerAuthenticatorImpl implements ServerAuthenticator {
122138
private enum Status {
123139
RECEIVE_CLIENT_FIRST_MESSAGE,
124140
RECEIVE_CLIENT_FINAL_MESSAGE,
125-
COMPLETED,
126-
CLOSED
141+
COMPLETED
127142
}
128143

129144
private Integer initialSalt;
130145
private int retryNumber = 0;
131146
private Status status = Status.RECEIVE_CLIENT_FIRST_MESSAGE;
132-
private String ip;
133147

134148
@Override
135149
public String protocol() {
@@ -139,7 +153,6 @@ public String protocol() {
139153
@Override
140154
public void initialize(AuthenticateContext context) {
141155
this.status = Status.RECEIVE_CLIENT_FIRST_MESSAGE;
142-
this.ip = context.ipAddress();
143156
this.initialSalt = null;
144157
}
145158

@@ -148,14 +161,14 @@ public byte[] evaluateResponse(byte[] token) throws AuthenticationException {
148161
int tokenValue = parseToken(token);
149162
switch (status) {
150163
case RECEIVE_CLIENT_FIRST_MESSAGE:
151-
if (errorType == ErrorType.SERVER_NO_CHALLENGE) {
164+
if (isError(tokenValue, ErrorType.SERVER_NO_CHALLENGE)) {
152165
return null;
153166
}
154-
if (errorType == ErrorType.SERVER_ERROR_CHALLENGE) {
167+
if (isError(tokenValue, ErrorType.SERVER_ERROR_CHALLENGE)) {
155168
return "-1".getBytes();
156169
}
157-
if (errorType == ErrorType.RETRIABLE_EXCEPTION && retryNumber++ < 3) {
158-
throw new RetriableAuthenticationException("Retriable exception " + ip);
170+
if (isError(tokenValue, ErrorType.RETRIABLE_EXCEPTION) && retryNumber++ < 3) {
171+
throw new RetriableAuthenticationException("Retriable exception");
159172
}
160173

161174
initialSalt = tokenValue + 1;
@@ -186,18 +199,6 @@ public FlussPrincipal createPrincipal() {
186199
public boolean isCompleted() {
187200
return status == Status.COMPLETED;
188201
}
189-
190-
@Override
191-
public void keepAlive(short apiKey) throws AuthenticationException {
192-
if (errorType == ErrorType.KEEP_ALIVE_ERROR) {
193-
throw new AuthenticationException("Keep alive error");
194-
}
195-
}
196-
197-
@Override
198-
public void close() throws IOException {
199-
status = Status.CLOSED;
200-
}
201202
}
202203

203204
private static int parseToken(byte[] token) {
@@ -211,13 +212,26 @@ private static int generateInitialSalt() {
211212
return ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE);
212213
}
213214

214-
/** Error types for testing. */
215-
public enum ErrorType {
216-
NONE,
217-
SERVER_NO_CHALLENGE,
218-
SERVER_ERROR_CHALLENGE,
219-
CLIENT_ERROR_SECOND_TOKE,
220-
RETRIABLE_EXCEPTION,
221-
KEEP_ALIVE_ERROR
215+
private static boolean isError(int errorType, ErrorType... errorTypes) {
216+
for (ErrorType type : errorTypes) {
217+
if (errorType == type.code) {
218+
return true;
219+
}
220+
}
221+
return false;
222+
}
223+
224+
enum ErrorType {
225+
NONE(-1),
226+
SERVER_NO_CHALLENGE(-2),
227+
SERVER_ERROR_CHALLENGE(-3),
228+
CLIENT_ERROR_SECOND_TOKEN(-4),
229+
RETRIABLE_EXCEPTION(-5);
230+
231+
final int code;
232+
233+
ErrorType(int code) {
234+
this.code = code;
235+
}
222236
}
223237
}

0 commit comments

Comments
 (0)