Skip to content

Commit ac5574e

Browse files
committed
[server] retry authentication retriable when authentication exception occurs.
1 parent 93a2bd7 commit ac5574e

File tree

9 files changed

+228
-78
lines changed

9 files changed

+228
-78
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.exception;
18+
19+
/**
20+
* This exception is thrown if authentication fails with a retriable error.
21+
*
22+
* @since 0.7
23+
*/
24+
public class RetriableAuthenticationException extends AuthenticationException {
25+
public RetriableAuthenticationException(String message) {
26+
super(message);
27+
}
28+
}

fluss-common/src/main/java/com/alibaba/fluss/security/auth/ClientAuthenticator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.alibaba.fluss.annotation.PublicEvolving;
2020
import com.alibaba.fluss.exception.AuthenticationException;
21+
import com.alibaba.fluss.shaded.netty4.io.netty.channel.Channel;
2122

2223
import javax.annotation.Nullable;
2324

@@ -28,6 +29,9 @@ public interface ClientAuthenticator {
2829
/** The protocol name of the authenticator, which will send in the AuthenticateRequest. */
2930
String protocol();
3031

32+
/** Initialize the authenticator. */
33+
default void initialize(AuthenticateContext context) {}
34+
3135
/**
3236
* * Generates the initial token or calculates a token based on the server's challenge, then
3337
* sends it back to the server. This method sets the client authentication status as complete if
@@ -75,4 +79,9 @@ public interface ClientAuthenticator {
7579

7680
/** Checks if the authentication from client side is completed. */
7781
boolean isCompleted();
82+
83+
/** The context of the authentication process. */
84+
interface AuthenticateContext {
85+
Channel channel();
86+
}
7887
}

fluss-common/src/main/java/com/alibaba/fluss/security/auth/ServerAuthenticator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.alibaba.fluss.annotation.PublicEvolving;
2020
import com.alibaba.fluss.exception.AuthenticationException;
2121
import com.alibaba.fluss.security.acl.FlussPrincipal;
22+
import com.alibaba.fluss.shaded.netty4.io.netty.channel.Channel;
2223

2324
/**
2425
* Authenticator for server side.
@@ -30,6 +31,9 @@ public interface ServerAuthenticator {
3031

3132
String protocol();
3233

34+
/** Initialize the authenticator. */
35+
default void initialize(AuthenticateContext context) {}
36+
3337
/**
3438
* * Generates the challenge based on the client's token, then sends it back to the client. This
3539
* method sets the server authentication status as complete if the authentication succeeds.
@@ -79,4 +83,9 @@ public interface ServerAuthenticator {
7983
* complete).
8084
*/
8185
FlussPrincipal createPrincipal();
86+
87+
/** The context of the authentication process. */
88+
interface AuthenticateContext {
89+
Channel channel();
90+
}
8291
}

fluss-common/src/test/java/com/alibaba/fluss/utils/ExponentialBackoffTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public void testExponentialBackoffWithoutJitter() {
5252
ExponentialBackoff exponentialBackoff = new ExponentialBackoff(100, 2, 400, 0.0);
5353
assertThat(exponentialBackoff.backoff(0)).isEqualTo(100);
5454
assertThat(exponentialBackoff.backoff(1)).isEqualTo(200);
55-
assertThat(exponentialBackoff.backoff(2)).isEqualTo(300);
55+
assertThat(exponentialBackoff.backoff(2)).isEqualTo(400);
5656
assertThat(exponentialBackoff.backoff(3)).isEqualTo(400);
5757
}
5858
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ServerConnection.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.exception.DisconnectException;
2121
import com.alibaba.fluss.exception.FlussRuntimeException;
2222
import com.alibaba.fluss.exception.NetworkException;
23+
import com.alibaba.fluss.exception.RetriableAuthenticationException;
2324
import com.alibaba.fluss.rpc.messages.ApiMessage;
2425
import com.alibaba.fluss.rpc.messages.ApiVersionsRequest;
2526
import com.alibaba.fluss.rpc.messages.ApiVersionsResponse;
@@ -38,6 +39,7 @@
3839
import com.alibaba.fluss.shaded.netty4.io.netty.channel.Channel;
3940
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelFuture;
4041
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelFutureListener;
42+
import com.alibaba.fluss.utils.ExponentialBackoff;
4143
import com.alibaba.fluss.utils.MapUtils;
4244

4345
import org.slf4j.Logger;
@@ -52,6 +54,7 @@
5254
import java.util.ArrayDeque;
5355
import java.util.Map;
5456
import java.util.concurrent.CompletableFuture;
57+
import java.util.concurrent.TimeUnit;
5558
import java.util.function.Consumer;
5659

5760
/** Connection to a Netty server used by the {@link NettyClient}. */
@@ -66,6 +69,7 @@ final class ServerConnection {
6669
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
6770
private final ConnectionMetricGroup connectionMetricGroup;
6871
private final ClientAuthenticator authenticator;
72+
private final ExponentialBackoff backoff;
6973

7074
private final Object lock = new Object();
7175

@@ -87,6 +91,9 @@ final class ServerConnection {
8791
@GuardedBy("lock")
8892
private ServerApiVersions serverApiVersions;
8993

94+
@GuardedBy("lock")
95+
private int retryAuthCount = 0;
96+
9097
ServerConnection(
9198
Bootstrap bootstrap,
9299
ServerNode node,
@@ -99,6 +106,7 @@ final class ServerConnection {
99106
.connect(node.host(), node.port())
100107
.addListener((ChannelFutureListener) this::establishConnection);
101108
this.authenticator = authenticator;
109+
this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
102110
}
103111

104112
public ServerNode getServerNode() {
@@ -366,13 +374,18 @@ private void handleApiVersionsResponse(ApiMessage response, Throwable cause) {
366374
synchronized (lock) {
367375
serverApiVersions =
368376
new ServerApiVersions(((ApiVersionsResponse) response).getApiVersionsList());
369-
LOG.debug("Begin to authenticate with protocol {}", authenticator.protocol());
370377
// send initial token
371-
sendAuthenticate(new byte[0]);
378+
sendInitialToken();
372379
}
373380
}
374381

375-
private void sendAuthenticate(byte[] challenge) {
382+
private void sendInitialToken() {
383+
authenticator.initialize(new DefaultAuthenticateContext());
384+
LOG.debug("Begin to authenticate with protocol {}", authenticator.protocol());
385+
sendAuthenticateRequest(new byte[0]);
386+
}
387+
388+
private void sendAuthenticateRequest(byte[] challenge) {
376389
try {
377390
if (!authenticator.isCompleted()) {
378391
byte[] token = authenticator.authenticate(challenge);
@@ -404,7 +417,16 @@ private void sendAuthenticate(byte[] challenge) {
404417

405418
private void handleAuthenticateResponse(ApiMessage response, Throwable cause) {
406419
if (cause != null) {
407-
close(cause);
420+
if (cause instanceof RetriableAuthenticationException) {
421+
LOG.warn("Authentication failed, retrying {} times", retryAuthCount, cause);
422+
channel.eventLoop()
423+
.schedule(
424+
this::sendInitialToken,
425+
backoff.backoff(retryAuthCount++),
426+
TimeUnit.MILLISECONDS);
427+
} else {
428+
close(cause);
429+
}
408430
return;
409431
}
410432
if (!(response instanceof AuthenticateResponse)) {
@@ -415,7 +437,7 @@ private void handleAuthenticateResponse(ApiMessage response, Throwable cause) {
415437
synchronized (lock) {
416438
AuthenticateResponse authenticateResponse = (AuthenticateResponse) response;
417439
if (authenticateResponse.hasChallenge()) {
418-
sendAuthenticate(((AuthenticateResponse) response).getChallenge());
440+
sendAuthenticateRequest(((AuthenticateResponse) response).getChallenge());
419441
} else if (authenticator.isCompleted()) {
420442
switchState(ConnectionState.READY);
421443
} else {
@@ -521,4 +543,11 @@ ByteBuf toByteBuf(ByteBufAllocator allocator) {
521543
return MessageCodec.encodeRequest(allocator, apiKey, apiVersion, requestId, request);
522544
}
523545
}
546+
547+
private class DefaultAuthenticateContext implements ClientAuthenticator.AuthenticateContext {
548+
@Override
549+
public Channel channel() {
550+
return channel;
551+
}
552+
}
524553
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServerHandler.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.alibaba.fluss.annotation.VisibleForTesting;
2020
import com.alibaba.fluss.exception.AuthenticationException;
2121
import com.alibaba.fluss.exception.NetworkException;
22+
import com.alibaba.fluss.exception.RetriableAuthenticationException;
2223
import com.alibaba.fluss.record.send.Send;
2324
import com.alibaba.fluss.rpc.messages.ApiMessage;
2425
import com.alibaba.fluss.rpc.messages.AuthenticateRequest;
@@ -32,6 +33,7 @@
3233
import com.alibaba.fluss.security.auth.ServerAuthenticator;
3334
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
3435
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBufAllocator;
36+
import com.alibaba.fluss.shaded.netty4.io.netty.channel.Channel;
3537
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelFutureListener;
3638
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
3739
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
@@ -162,6 +164,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
162164
super.channelActive(ctx);
163165
this.ctx = ctx;
164166
this.remoteAddress = ctx.channel().remoteAddress();
167+
authenticator.initialize(new DefaultAuthenticateContext());
165168
switchState(
166169
authenticator.isCompleted()
167170
? ConnectionState.READY
@@ -316,14 +319,27 @@ private void handleAuthenticateRequest(
316319
}
317320

318321
AuthenticateResponse authenticateResponse = new AuthenticateResponse();
319-
if (!authenticator.isCompleted()) {
320-
byte[] token = authenticateRequest.getToken();
321-
byte[] challenge = authenticator.evaluateResponse(token);
322-
if (!authenticator.isCompleted() && challenge != null) {
323-
authenticateResponse.setChallenge(challenge);
322+
try {
323+
if (!authenticator.isCompleted()) {
324+
byte[] token = authenticateRequest.getToken();
325+
byte[] challenge = authenticator.evaluateResponse(token);
326+
if (challenge != null) {
327+
authenticateResponse.setChallenge(challenge);
328+
}
329+
}
330+
future.complete(authenticateResponse);
331+
} catch (AuthenticationException e) {
332+
if (e instanceof RetriableAuthenticationException) {
333+
LOG.warn(
334+
"Authentication from {} failed due to a retriable exception: {}. Reinitializing authenticator for subsequent retries.",
335+
ctx.channel().remoteAddress(),
336+
e.getMessage(),
337+
e);
338+
authenticator.initialize(new DefaultAuthenticateContext());
324339
}
340+
341+
future.completeExceptionally(e);
325342
}
326-
future.complete(authenticateResponse);
327343

328344
if (authenticator.isCompleted()) {
329345
switchState(ConnectionState.READY);
@@ -349,4 +365,11 @@ public boolean isAuthenticating() {
349365
return this == AUTHENTICATING;
350366
}
351367
}
368+
369+
private class DefaultAuthenticateContext implements ServerAuthenticator.AuthenticateContext {
370+
@Override
371+
public Channel channel() {
372+
return ctx.channel();
373+
}
374+
}
352375
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import com.alibaba.fluss.exception.PartitionAlreadyExistsException;
5454
import com.alibaba.fluss.exception.PartitionNotExistException;
5555
import com.alibaba.fluss.exception.RecordTooLargeException;
56+
import com.alibaba.fluss.exception.RetriableAuthenticationException;
5657
import com.alibaba.fluss.exception.SchemaNotExistException;
5758
import com.alibaba.fluss.exception.SecurityDisabledException;
5859
import com.alibaba.fluss.exception.SecurityTokenException;
@@ -61,7 +62,6 @@
6162
import com.alibaba.fluss.exception.TableNotExistException;
6263
import com.alibaba.fluss.exception.TableNotPartitionedException;
6364
import com.alibaba.fluss.exception.TimeoutException;
64-
import com.alibaba.fluss.exception.TooManyBucketsException;
6565
import com.alibaba.fluss.exception.TooManyPartitionsException;
6666
import com.alibaba.fluss.exception.UnknownServerException;
6767
import com.alibaba.fluss.exception.UnknownTableOrBucketException;
@@ -198,8 +198,10 @@ public enum Errors {
198198
AUTHENTICATE_EXCEPTION(46, "Authentication failed.", AuthenticationException::new),
199199
SECURITY_DISABLED_EXCEPTION(47, "Security is disabled.", SecurityDisabledException::new),
200200
AUTHORIZATION_EXCEPTION(48, "Authorization failed", AuthorizationException::new),
201-
BUCKET_MAX_NUM_EXCEPTION(
202-
49, "Exceed the maximum number of buckets", TooManyBucketsException::new);
201+
RETRIABLE_AUTHENTICATE_EXCEPTION(
202+
49,
203+
"Authentication failed with retriable exception. ",
204+
RetriableAuthenticationException::new);
203205

204206
private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
205207

fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/authenticate/AuthenticationTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,17 @@ void testNoChallengeBeforeClientComplete() throws Exception {
122122
}
123123
}
124124

125+
@Test
126+
void testRetirableAuthenticateException() throws Exception {
127+
Configuration clientConfig = new Configuration();
128+
clientConfig.set(ConfigOptions.CLIENT_SECURITY_PROTOCOL, "mutual");
129+
clientConfig.setString("client.security.mutual.error-type", "RETRIABLE_EXCEPTION");
130+
try (NettyClient nettyClient =
131+
new NettyClient(clientConfig, TestingClientMetricGroup.newInstance())) {
132+
verifyGetTableNamesList(nettyClient, mutualAuthServerNode);
133+
}
134+
}
135+
125136
@Test
126137
void testClientLackAuthenticateProtocol() throws Exception {
127138
Configuration clientConfig = new Configuration();

0 commit comments

Comments
 (0)