Skip to content

Commit be162a4

Browse files
authored
[hotfix] Improve bucket ready check logic (#565)
1 parent e1d2b39 commit be162a4

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -393,11 +393,8 @@ private long bucketReady(
393393

394394
boolean exhausted = writerBufferPool.queued() > 0;
395395
for (Map.Entry<Integer, Deque<WriteBatch>> entry : batches.entrySet()) {
396-
int bucketId = entry.getKey();
397396
Deque<WriteBatch> deque = entry.getValue();
398397

399-
TableBucket tableBucket = cluster.getTableBucket(physicalTablePath, bucketId);
400-
ServerNode leader = cluster.leaderFor(tableBucket);
401398
final long waitedTimeMs;
402399
final int dequeSize;
403400
final boolean full;
@@ -419,6 +416,9 @@ private long bucketReady(
419416
full = dequeSize > 1 || batch.isClosed();
420417
}
421418

419+
int bucketId = entry.getKey();
420+
TableBucket tableBucket = cluster.getTableBucket(physicalTablePath, bucketId);
421+
ServerNode leader = cluster.leaderFor(tableBucket);
422422
if (leader == null) {
423423
// This is a bucket for which leader is not known, but messages are
424424
// available to send. Note that entries are currently not removed from

0 commit comments

Comments
 (0)