Skip to content

Commit b01c3bc

Browse files
ReplicaFetcher busy loop retry storm during leader election or bucket migration
1 parent 07721fe commit b01c3bc

File tree

2 files changed

+34
-3
lines changed

2 files changed

+34
-3
lines changed

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,14 +1304,27 @@ private void maybeAddDelayedFetchLog(
13041304
boolean errorReadingData = false;
13051305
boolean hasFetchFromLocal = false;
13061306
Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap = new HashMap<>();
1307+
Map<TableBucket, FetchLogResultForBucket> expectedErrorBuckets = new HashMap<>();
13071308
for (Map.Entry<TableBucket, LogReadResult> logReadResultEntry : logReadResults.entrySet()) {
13081309
TableBucket tb = logReadResultEntry.getKey();
13091310
LogReadResult logReadResult = logReadResultEntry.getValue();
13101311
FetchLogResultForBucket fetchLogResultForBucket =
13111312
logReadResult.getFetchLogResultForBucket();
13121313
if (fetchLogResultForBucket.failed()) {
1313-
errorReadingData = true;
1314-
break;
1314+
// Check if this is an expected error (like NOT_LEADER_OR_FOLLOWER,
1315+
// UNKNOWN_TABLE_OR_BUCKET, LOG_OFFSET_OUT_OF_RANGE) which should not
1316+
// short-circuit the entire fetch request.
1317+
Errors error = fetchLogResultForBucket.getError().error();
1318+
if (isNonCriticalFetchError(error)) {
1319+
// Expected errors should not prevent other buckets from being delayed.
1320+
// Save the error bucket to be returned later, and continue processing others.
1321+
expectedErrorBuckets.put(tb, fetchLogResultForBucket);
1322+
continue;
1323+
} else {
1324+
// Severe/unexpected error - short-circuit and return immediately
1325+
errorReadingData = true;
1326+
break;
1327+
}
13151328
}
13161329

13171330
if (!fetchLogResultForBucket.fetchFromRemote()) {
@@ -1347,7 +1360,11 @@ private void maybeAddDelayedFetchLog(
13471360
params,
13481361
this,
13491362
fetchBucketStatusMap,
1350-
responseCallback,
1363+
delayedResponse -> {
1364+
// Merge expected error buckets with delayed response
1365+
delayedResponse.putAll(expectedErrorBuckets);
1366+
responseCallback.accept(delayedResponse);
1367+
},
13511368
serverMetricGroup);
13521369

13531370
// try to complete the request immediately, otherwise put it into the
@@ -1361,6 +1378,19 @@ private void maybeAddDelayedFetchLog(
13611378
}
13621379
}
13631380

1381+
/**
1382+
* Check if the error is an expected fetch error that should not short-circuit the entire fetch
1383+
* request. These errors are common during normal operations (e.g., leader changes, bucket
1384+
* migrations) and should not prevent other buckets from being delayed.
1385+
*
1386+
* @param error the error to check
1387+
* @return true if the error is expected and should not short-circuit
1388+
*/
1389+
private boolean isNonCriticalFetchError(Errors error) {
1390+
return error == Errors.NOT_LEADER_OR_FOLLOWER
1391+
|| error == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION;
1392+
}
1393+
13641394
private void completeDelayedOperations(TableBucket tableBucket) {
13651395
DelayedTableBucketKey delayedTableBucketKey = new DelayedTableBucketKey(tableBucket);
13661396
delayedWriteManager.checkAndComplete(delayedTableBucketKey);

fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ private void handleFetchLogResponse(
280280
"Remote server is not the leader for replica {}, which indicate "
281281
+ "that the replica is being moved.",
282282
tableBucket);
283+
replicasWithError.add(tableBucket);
283284
break;
284285
default:
285286
LOG.error(

0 commit comments

Comments
 (0)