Skip to content

Commit aa04bb4

Browse files
loserwang1024polyzos
authored andcommitted
[server] Add retriable authentication exception. (apache#845)
1 parent d832e4b commit aa04bb4

File tree

11 files changed

+371
-83
lines changed

11 files changed

+371
-83
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ Apache Kafka
311311
./fluss-common/src/main/java/com/alibaba/fluss/utils/crc/PureJavaCrc32C.java
312312
./fluss-common/src/main/java/com/alibaba/fluss/utils/log/ByteBufferUnmapper.java
313313
./fluss-common/src/main/java/com/alibaba/fluss/utils/log/FairBucketStatusMap.java
314+
./fluss-common/src/main/java/com/alibaba/fluss/utils/ExponentialBackoff.java
314315
./fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
315316
./fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
316317
./fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.exception;
18+
19+
/**
20+
* This exception is thrown if authentication fails with a retriable error.
21+
*
22+
* @since 0.7
23+
*/
24+
public class RetriableAuthenticationException extends AuthenticationException {
25+
public RetriableAuthenticationException(String message) {
26+
super(message);
27+
}
28+
}

fluss-common/src/main/java/com/alibaba/fluss/security/auth/ClientAuthenticator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ public interface ClientAuthenticator {
2828
/** The protocol name of the authenticator, which will send in the AuthenticateRequest. */
2929
String protocol();
3030

31+
/** Initialize the authenticator. */
32+
default void initialize(AuthenticateContext context) {}
33+
3134
/**
3235
* * Generates the initial token or calculates a token based on the server's challenge, then
3336
* sends it back to the server. This method sets the client authentication status as complete if
@@ -75,4 +78,7 @@ public interface ClientAuthenticator {
7578

7679
/** Checks if the authentication from client side is completed. */
7780
boolean isCompleted();
81+
82+
/** The context of the authentication process. */
83+
interface AuthenticateContext {}
7884
}

fluss-common/src/main/java/com/alibaba/fluss/security/auth/ServerAuthenticator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public interface ServerAuthenticator {
3030

3131
String protocol();
3232

33+
/** Initialize the authenticator. */
34+
default void initialize(AuthenticateContext context) {}
35+
3336
/**
3437
* * Generates the challenge based on the client's token, then sends it back to the client. This
3538
* method sets the server authentication status as complete if the authentication succeeds.
@@ -79,4 +82,7 @@ public interface ServerAuthenticator {
7982
* complete).
8083
*/
8184
FlussPrincipal createPrincipal();
85+
86+
/** The context of the authentication process. */
87+
interface AuthenticateContext {}
8288
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.utils;
18+
19+
import javax.annotation.concurrent.ThreadSafe;
20+
21+
import java.util.concurrent.ThreadLocalRandom;
22+
23+
/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
24+
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
25+
* additional information regarding copyright ownership. */
26+
27+
/**
28+
* A utility class for computing exponential backoff values, commonly used for retry logic,
29+
* reconnection attempts, and timeout management.
30+
*
31+
* <p>The backoff interval increases exponentially with each attempt, following the formula:
32+
*
33+
* <pre>
34+
* Backoff(attempts) = random(1 - jitter, 1 + jitter) * initialInterval * multiplier^attempts
35+
* </pre>
36+
*
37+
* <p>If {@code maxInterval} is less than {@code initialInterval}, a constant backoff of {@code
38+
* maxInterval} will be applied. The jitter factor ensures randomness to avoid thundering herd
39+
* problems, but will never cause the result to exceed the configured maximum interval.
40+
*
41+
* <p>Instances of this class are thread-safe and can be shared across multiple threads.
42+
*/
43+
@ThreadSafe
44+
public class ExponentialBackoff {
45+
private final long initialInterval;
46+
private final int multiplier;
47+
private final long maxInterval;
48+
private final double jitter;
49+
private final double expMax;
50+
51+
public ExponentialBackoff(
52+
long initialInterval, int multiplier, long maxInterval, double jitter) {
53+
this.initialInterval = Math.min(maxInterval, initialInterval);
54+
this.multiplier = multiplier;
55+
this.maxInterval = maxInterval;
56+
this.jitter = jitter;
57+
this.expMax =
58+
maxInterval > initialInterval
59+
? Math.log(maxInterval / (double) Math.max(initialInterval, 1))
60+
/ Math.log(multiplier)
61+
: 0;
62+
}
63+
64+
public long backoff(long attempts) {
65+
if (expMax == 0) {
66+
return initialInterval;
67+
}
68+
double exp = Math.min(attempts, this.expMax);
69+
double term = initialInterval * Math.pow(multiplier, exp);
70+
double randomFactor =
71+
jitter < Double.MIN_NORMAL
72+
? 1.0
73+
: ThreadLocalRandom.current().nextDouble(1 - jitter, 1 + jitter);
74+
long backoffValue = (long) (randomFactor * term);
75+
return Math.min(backoffValue, maxInterval);
76+
}
77+
78+
@Override
79+
public String toString() {
80+
return "ExponentialBackoff{"
81+
+ "multiplier="
82+
+ multiplier
83+
+ ", expMax="
84+
+ expMax
85+
+ ", initialInterval="
86+
+ initialInterval
87+
+ ", jitter="
88+
+ jitter
89+
+ '}';
90+
}
91+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.utils;
18+
19+
import org.assertj.core.data.Percentage;
20+
import org.junit.jupiter.api.Test;
21+
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
24+
/** Test for {@link ExponentialBackoff}. */
25+
public class ExponentialBackoffTest {
26+
@Test
27+
public void testExponentialBackoff() {
28+
long initialValue = 100;
29+
int ratio = 2;
30+
long backoffMax = 2000;
31+
double jitter = 0.2;
32+
ExponentialBackoff exponentialBackoff =
33+
new ExponentialBackoff(initialValue, ratio, backoffMax, jitter);
34+
35+
for (int i = 0; i <= 100; i++) {
36+
for (int attempts = 0; attempts <= 10; attempts++) {
37+
if (attempts <= 4) {
38+
assertThat(1.0 * exponentialBackoff.backoff(attempts))
39+
.isCloseTo(
40+
initialValue * Math.pow(ratio, attempts),
41+
Percentage.withPercentage(jitter * 100));
42+
} else {
43+
assertThat(exponentialBackoff.backoff(attempts) <= backoffMax * (1 + jitter))
44+
.isTrue();
45+
}
46+
}
47+
}
48+
}
49+
50+
@Test
51+
public void testExponentialBackoffWithoutJitter() {
52+
ExponentialBackoff exponentialBackoff = new ExponentialBackoff(100, 2, 400, 0.0);
53+
assertThat(exponentialBackoff.backoff(0)).isEqualTo(100);
54+
assertThat(exponentialBackoff.backoff(1)).isEqualTo(200);
55+
assertThat(exponentialBackoff.backoff(2)).isEqualTo(400);
56+
assertThat(exponentialBackoff.backoff(3)).isEqualTo(400);
57+
}
58+
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ServerConnection.java

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.exception.DisconnectException;
2121
import com.alibaba.fluss.exception.FlussRuntimeException;
2222
import com.alibaba.fluss.exception.NetworkException;
23+
import com.alibaba.fluss.exception.RetriableAuthenticationException;
2324
import com.alibaba.fluss.rpc.messages.ApiMessage;
2425
import com.alibaba.fluss.rpc.messages.ApiVersionsRequest;
2526
import com.alibaba.fluss.rpc.messages.ApiVersionsResponse;
@@ -38,6 +39,7 @@
3839
import com.alibaba.fluss.shaded.netty4.io.netty.channel.Channel;
3940
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelFuture;
4041
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelFutureListener;
42+
import com.alibaba.fluss.utils.ExponentialBackoff;
4143
import com.alibaba.fluss.utils.MapUtils;
4244

4345
import org.slf4j.Logger;
@@ -52,6 +54,7 @@
5254
import java.util.ArrayDeque;
5355
import java.util.Map;
5456
import java.util.concurrent.CompletableFuture;
57+
import java.util.concurrent.TimeUnit;
5558
import java.util.function.Consumer;
5659

5760
/** Connection to a Netty server used by the {@link NettyClient}. */
@@ -66,6 +69,7 @@ final class ServerConnection {
6669
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
6770
private final ConnectionMetricGroup connectionMetricGroup;
6871
private final ClientAuthenticator authenticator;
72+
private final ExponentialBackoff backoff;
6973

7074
private final Object lock = new Object();
7175

@@ -87,6 +91,9 @@ final class ServerConnection {
8791
@GuardedBy("lock")
8892
private ServerApiVersions serverApiVersions;
8993

94+
@GuardedBy("lock")
95+
private int retryAuthCount = 0;
96+
9097
ServerConnection(
9198
Bootstrap bootstrap,
9299
ServerNode node,
@@ -99,6 +106,7 @@ final class ServerConnection {
99106
.connect(node.host(), node.port())
100107
.addListener((ChannelFutureListener) this::establishConnection);
101108
this.authenticator = authenticator;
109+
this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
102110
}
103111

104112
public ServerNode getServerNode() {
@@ -366,13 +374,18 @@ private void handleApiVersionsResponse(ApiMessage response, Throwable cause) {
366374
synchronized (lock) {
367375
serverApiVersions =
368376
new ServerApiVersions(((ApiVersionsResponse) response).getApiVersionsList());
369-
LOG.debug("Begin to authenticate with protocol {}", authenticator.protocol());
370377
// send initial token
371-
sendAuthenticate(new byte[0]);
378+
sendInitialToken();
372379
}
373380
}
374381

375-
private void sendAuthenticate(byte[] challenge) {
382+
private void sendInitialToken() {
383+
authenticator.initialize(new DefaultAuthenticateContext());
384+
LOG.debug("Begin to authenticate with protocol {}", authenticator.protocol());
385+
sendAuthenticateRequest(new byte[0]);
386+
}
387+
388+
private void sendAuthenticateRequest(byte[] challenge) {
376389
try {
377390
if (!authenticator.isCompleted()) {
378391
byte[] token = authenticator.authenticate(challenge);
@@ -403,19 +416,28 @@ private void sendAuthenticate(byte[] challenge) {
403416
}
404417

405418
private void handleAuthenticateResponse(ApiMessage response, Throwable cause) {
406-
if (cause != null) {
407-
close(cause);
408-
return;
409-
}
410-
if (!(response instanceof AuthenticateResponse)) {
411-
close(new IllegalStateException("Unexpected response type " + response.getClass()));
412-
return;
413-
}
414-
415419
synchronized (lock) {
420+
if (cause != null) {
421+
if (cause instanceof RetriableAuthenticationException) {
422+
LOG.warn("Authentication failed, retrying {} times", retryAuthCount, cause);
423+
channel.eventLoop()
424+
.schedule(
425+
this::sendInitialToken,
426+
backoff.backoff(retryAuthCount++),
427+
TimeUnit.MILLISECONDS);
428+
} else {
429+
close(cause);
430+
}
431+
return;
432+
}
433+
if (!(response instanceof AuthenticateResponse)) {
434+
close(new IllegalStateException("Unexpected response type " + response.getClass()));
435+
return;
436+
}
437+
416438
AuthenticateResponse authenticateResponse = (AuthenticateResponse) response;
417439
if (authenticateResponse.hasChallenge()) {
418-
sendAuthenticate(((AuthenticateResponse) response).getChallenge());
440+
sendAuthenticateRequest(((AuthenticateResponse) response).getChallenge());
419441
} else if (authenticator.isCompleted()) {
420442
switchState(ConnectionState.READY);
421443
} else {
@@ -521,4 +543,7 @@ ByteBuf toByteBuf(ByteBufAllocator allocator) {
521543
return MessageCodec.encodeRequest(allocator, apiKey, apiVersion, requestId, request);
522544
}
523545
}
546+
547+
private static class DefaultAuthenticateContext
548+
implements ClientAuthenticator.AuthenticateContext {}
524549
}

0 commit comments

Comments
 (0)