Skip to content

Commit 5e66329

Browse files
committed
fix set bucket
1 parent 8695b1d commit 5e66329

File tree

2 files changed

+6
-14
lines changed

2 files changed

+6
-14
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/FullScanBatchScanner.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,15 @@
4040
import java.nio.ByteBuffer;
4141
import java.time.Duration;
4242
import java.util.ArrayList;
43-
import java.util.HashSet;
4443
import java.util.List;
4544
import java.util.Objects;
4645
import java.util.concurrent.CompletableFuture;
4746
import java.util.concurrent.ExecutionException;
4847
import java.util.concurrent.TimeUnit;
4948

5049
/**
51-
* BatchScanner that performs a FULL_SCAN snapshot across all leaders of a KV table (or a
52-
* single partition) and exposes the result via pollBatch in a bounded fashion.
50+
* BatchScanner that performs a FULL_SCAN snapshot across all leaders of a KV table (or a single
51+
* partition) and exposes the result via pollBatch in a bounded fashion.
5352
*/
5453
@Internal
5554
public class FullScanBatchScanner implements BatchScanner {
@@ -86,17 +85,13 @@ private void startFullScan() {
8685
long tableId = tableInfo.getTableId();
8786
int numBuckets = tableInfo.getNumBuckets();
8887

89-
// Find leader tablet/servers for this table/partition
90-
HashSet<Integer> leaderServers = new HashSet<>();
88+
// Send full scan request per bucket to its leader tablet/server
89+
List<CompletableFuture<FullScanResponse>> responseFutures = new ArrayList<>();
9190
for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
9291
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
9392
metadataUpdater.checkAndUpdateMetadata(tableInfo.getTablePath(), tableBucket);
9493
int leader = metadataUpdater.leaderFor(tableBucket);
95-
leaderServers.add(leader);
96-
}
9794

98-
List<CompletableFuture<FullScanResponse>> responseFutures = new ArrayList<>();
99-
for (int leader : leaderServers) {
10095
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader);
10196
if (gateway == null) {
10297
rowsFuture.completeExceptionally(
@@ -108,8 +103,7 @@ private void startFullScan() {
108103
}
109104
FullScanRequest request = new FullScanRequest();
110105
request.setTableId(tableId);
111-
request.setBucketId(0);
112-
106+
request.setBucketId(bucketId);
113107
if (partitionId != null) {
114108
request.setPartitionId(partitionId);
115109
}

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/FullScanBatchScannerITCase.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,7 @@
5757
import static org.assertj.core.api.Assertions.assertThat;
5858
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5959

60-
/**
61-
* End-to-end IT cases for FullScanBatchScanner functionality.
62-
*/
60+
/** End-to-end IT cases for FullScanBatchScanner functionality. */
6361
class FullScanBatchScannerITCase {
6462

6563
@RegisterExtension

0 commit comments

Comments
 (0)