Skip to content

Commit 01f95ef

Browse files
authored
[server] Add retry times for ZkSequenceIDCounter and Sender to acquire writer id from zk (#1066)
1 parent b237f1b commit 01f95ef

File tree

3 files changed

+39
-14
lines changed

3 files changed

+39
-14
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/write/IdempotenceManager.java

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.alibaba.fluss.annotation.Internal;
2020
import com.alibaba.fluss.annotation.VisibleForTesting;
21+
import com.alibaba.fluss.exception.AuthorizationException;
2122
import com.alibaba.fluss.exception.OutOfOrderSequenceException;
2223
import com.alibaba.fluss.exception.UnknownWriterIdException;
2324
import com.alibaba.fluss.metadata.PhysicalTablePath;
@@ -27,6 +28,7 @@
2728
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
2829
import com.alibaba.fluss.rpc.messages.InitWriterRequest;
2930
import com.alibaba.fluss.rpc.protocol.Errors;
31+
import com.alibaba.fluss.utils.ExceptionUtils;
3032

3133
import org.slf4j.Logger;
3234
import org.slf4j.LoggerFactory;
@@ -35,7 +37,6 @@
3537

3638
import java.util.Optional;
3739
import java.util.Set;
38-
import java.util.concurrent.ExecutionException;
3940
import java.util.stream.Collectors;
4041

4142
import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID;
@@ -53,6 +54,10 @@
5354
public class IdempotenceManager {
5455
private static final Logger LOG = LoggerFactory.getLogger(IdempotenceManager.class);
5556

57+
// retry config for requesting writerId, maybe make it as configurable
58+
private static final int RETRY_TIMES = 3;
59+
private static final int RETRY_INTERVAL_MS = 100;
60+
5661
private final boolean idempotenceEnabled;
5762
private final IdempotenceBucketMap idempotenceBucketMap;
5863
private final int maxInflightRequestsPerBucket;
@@ -283,13 +288,33 @@ synchronized boolean canRetry(WriteBatch batch, TableBucket tableBucket, Errors
283288
return false;
284289
}
285290

286-
void maybeWaitForWriterId(Set<PhysicalTablePath> tablePaths)
287-
throws ExecutionException, InterruptedException {
288-
if (!isWriterIdValid()) {
289-
tabletServerGateway
290-
.initWriter(prepareInitWriterRequest(tablePaths))
291-
.thenAccept(response -> setWriterId(response.getWriterId()))
292-
.get(); // TODO: can optimize into async response handling.
291+
void maybeWaitForWriterId(Set<PhysicalTablePath> tablePaths) throws Throwable {
292+
if (isWriterIdValid()) {
293+
return;
294+
}
295+
296+
int retryCount = 0;
297+
while (true) {
298+
try {
299+
tabletServerGateway
300+
.initWriter(prepareInitWriterRequest(tablePaths))
301+
.thenAccept(response -> setWriterId(response.getWriterId()))
302+
.get(); // TODO: can optimize into async response handling.
303+
return;
304+
} catch (Exception e) {
305+
Throwable t = ExceptionUtils.stripExecutionException(e);
306+
if (t instanceof AuthorizationException || retryCount >= RETRY_TIMES) {
307+
throw t;
308+
} else {
309+
LOG.warn(
310+
"Failed to init writer id, the retry count: {}, error message: {}",
311+
retryCount,
312+
t.getMessage());
313+
retryCount++;
314+
long delayMs = (long) (RETRY_INTERVAL_MS * Math.pow(2, retryCount));
315+
Thread.sleep(delayMs);
316+
}
317+
}
293318
}
294319
}
295320

fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,7 @@ public void runOnce() throws Exception {
178178
// TODO: only request to init writer_id when we have valid target tables
179179
try {
180180
idempotenceManager.maybeWaitForWriterId(targetTables);
181-
} catch (Exception e) {
182-
Throwable t = ExceptionUtils.stripExecutionException(e);
183-
181+
} catch (Throwable t) {
184182
// TODO: If 'only request to init writer_id when we have valid target tables' have
185183
// been down, this if check can be removed.
186184
if (!targetTables.isEmpty()) {

fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZkSequenceIDCounter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
2121
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.recipes.atomic.AtomicValue;
2222
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
23-
import com.alibaba.fluss.shaded.curator5.org.apache.curator.retry.RetryNTimes;
23+
import com.alibaba.fluss.shaded.curator5.org.apache.curator.retry.BoundedExponentialBackoffRetry;
2424

2525
import javax.annotation.concurrent.ThreadSafe;
2626

@@ -30,7 +30,8 @@ public class ZkSequenceIDCounter implements SequenceIDCounter {
3030

3131
// maybe make it as configurable
3232
private static final int RETRY_TIMES = 10;
33-
private static final int RETRY_INTERVAL_MS = 100;
33+
private static final int BASE_SLEEP_MS = 100;
34+
private static final int MAX_SLEEP_MS = 1000;
3435

3536
private final DistributedAtomicLong sequenceIdCounter;
3637

@@ -39,7 +40,8 @@ public ZkSequenceIDCounter(CuratorFramework curatorClient, String sequenceIDPath
3940
new DistributedAtomicLong(
4041
curatorClient,
4142
sequenceIDPath,
42-
new RetryNTimes(RETRY_TIMES, RETRY_INTERVAL_MS));
43+
new BoundedExponentialBackoffRetry(
44+
BASE_SLEEP_MS, MAX_SLEEP_MS, RETRY_TIMES));
4345
}
4446

4547
/**

0 commit comments

Comments
 (0)