Skip to content

Commit a339158

Browse files
authored
[client] Fix NPE thrown by MetadataUpdater when getting bucket count (apache#2054)
1 parent b7203a7 commit a339158

File tree

6 files changed

+19
-19
lines changed

6 files changed

+19
-19
lines changed

fluss-client/src/main/java/org/apache/fluss/client/write/RoundRobinBucketAssigner.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@
3131
@Internal
3232
public class RoundRobinBucketAssigner extends DynamicBucketAssigner {
3333
private final PhysicalTablePath physicalTablePath;
34+
private final int bucketNumber;
3435
private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
3536

36-
public RoundRobinBucketAssigner(PhysicalTablePath physicalTablePath) {
37+
public RoundRobinBucketAssigner(PhysicalTablePath physicalTablePath, int bucketNumber) {
3738
this.physicalTablePath = physicalTablePath;
39+
this.bucketNumber = bucketNumber;
3840
}
3941

4042
@Override
@@ -47,8 +49,7 @@ public int assignBucket(Cluster cluster) {
4749
return bucketsForTable.get(bucket).getBucketId();
4850
} else {
4951
// no buckets are available, give a non-available bucket.
50-
return MathUtils.toPositive(nextValue)
51-
% cluster.getBucketCount(physicalTablePath.getTablePath());
52+
return MathUtils.toPositive(nextValue) % bucketNumber;
5253
}
5354
}
5455

fluss-client/src/main/java/org/apache/fluss/client/write/StickyBucketAssigner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@
3535
public class StickyBucketAssigner extends DynamicBucketAssigner {
3636

3737
private final PhysicalTablePath physicalTablePath;
38+
private final int bucketNumber;
3839
private final AtomicInteger currentBucketId;
3940

40-
public StickyBucketAssigner(PhysicalTablePath physicalTablePath) {
41+
public StickyBucketAssigner(PhysicalTablePath physicalTablePath, int bucketNumber) {
4142
this.physicalTablePath = physicalTablePath;
43+
this.bucketNumber = bucketNumber;
4244
this.currentBucketId = new AtomicInteger(-1);
4345
}
4446

@@ -73,7 +75,7 @@ private int nextBucket(Cluster cluster, int preBucketId) {
7375
cluster.getAvailableBucketsForPhysicalTablePath(physicalTablePath);
7476
if (availableBuckets.isEmpty()) {
7577
int random = MathUtils.toPositive(ThreadLocalRandom.current().nextInt());
76-
newBucket = random % cluster.getBucketCount(physicalTablePath.getTablePath());
78+
newBucket = random % bucketNumber;
7779
} else if (availableBuckets.size() == 1) {
7880
newBucket = availableBuckets.get(0).getBucketId();
7981
} else {

fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,9 +354,9 @@ private BucketAssigner createBucketAssigner(
354354
ConfigOptions.NoKeyAssigner noKeyAssigner =
355355
conf.get(ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER);
356356
if (noKeyAssigner == ROUND_ROBIN) {
357-
return new RoundRobinBucketAssigner(physicalTablePath);
357+
return new RoundRobinBucketAssigner(physicalTablePath, bucketNumber);
358358
} else if (noKeyAssigner == STICKY) {
359-
return new StickyBucketAssigner(physicalTablePath);
359+
return new StickyBucketAssigner(physicalTablePath, bucketNumber);
360360
} else {
361361
throw new IllegalArgumentException(
362362
"Unsupported append only row bucket assigner: " + noKeyAssigner);

fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,8 @@ void testAppendWithStickyBucketAssigner() throws Exception {
343343
int batchSize = 100;
344344
IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
345345

346-
StickyBucketAssigner bucketAssigner = new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH);
346+
StickyBucketAssigner bucketAssigner =
347+
new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH, 3);
347348
RecordAccumulator accum =
348349
createTestRecordAccumulator(
349350
(int) Duration.ofMinutes(1).toMillis(),

fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ void testSticky() {
6565
// init cluster.
6666
Cluster cluster = updateCluster(Arrays.asList(bucket1, bucket2, bucket3));
6767
StickyBucketAssigner stickyBucketAssigner =
68-
new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH);
68+
new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH, 3);
6969
int bucketId = stickyBucketAssigner.assignBucket(cluster);
7070
assertThat(bucketId >= 0 && bucketId < 3).isTrue();
7171

@@ -94,7 +94,7 @@ void testBucketIdShouldNotChange() {
9494
// init cluster.
9595
Cluster cluster = updateCluster(Arrays.asList(bucket1, bucket2, bucket3));
9696
StickyBucketAssigner stickyBucketAssigner =
97-
new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH);
97+
new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH, 3);
9898
int bucketId = stickyBucketAssigner.assignBucket(cluster);
9999
for (int i = 0; i < 3; i++) {
100100
if (i != bucketId) {
@@ -111,7 +111,7 @@ void testOnlyOneAvailableBuckets() {
111111
// init cluster.
112112
Cluster cluster = updateCluster(Collections.singletonList(bucket1));
113113
StickyBucketAssigner stickyBucketAssigner =
114-
new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH);
114+
new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH, 3);
115115
int bucketId = stickyBucketAssigner.assignBucket(cluster);
116116

117117
for (int i = 0; i < 100; i++) {
@@ -136,7 +136,7 @@ void testAvailableBucketsTest() {
136136
Cluster cluster = updateCluster(allBuckets);
137137

138138
// Assure we never choose bucket 1 for tp1 because it is unavailable.
139-
StickyBucketAssigner stickyBucketAssigner = new StickyBucketAssigner(tp1);
139+
StickyBucketAssigner stickyBucketAssigner = new StickyBucketAssigner(tp1, 3);
140140
int bucketForTp1 = stickyBucketAssigner.assignBucket(cluster);
141141
assertThat(bucketForTp1).isNotEqualTo(1);
142142
for (int i = 0; i < 100; i++) {
@@ -145,7 +145,7 @@ void testAvailableBucketsTest() {
145145
}
146146

147147
// Assure we always choose bucket 1 for tp2.
148-
stickyBucketAssigner = new StickyBucketAssigner(tp2);
148+
stickyBucketAssigner = new StickyBucketAssigner(tp2, 3);
149149
int bucketForTp2 = stickyBucketAssigner.assignBucket(cluster);
150150
assertThat(bucketForTp2).isEqualTo(1);
151151
for (int i = 0; i < 100; i++) {
@@ -154,7 +154,7 @@ void testAvailableBucketsTest() {
154154
}
155155

156156
// Assure that we can still choose one bucket even if there are no available buckets.
157-
stickyBucketAssigner = new StickyBucketAssigner(tp3);
157+
stickyBucketAssigner = new StickyBucketAssigner(tp3, 3);
158158
int bucketForTp3 = stickyBucketAssigner.assignBucket(cluster);
159159
assertThat(bucketForTp3).isIn(0, 1, 2);
160160
stickyBucketAssigner.onNewBatch(cluster, bucketForTp3);
@@ -165,7 +165,7 @@ void testAvailableBucketsTest() {
165165
void testMultiThreadToCallOnNewBatch() {
166166
Cluster cluster = updateCluster(Arrays.asList(bucket1, bucket2, bucket3));
167167
StickyBucketAssigner stickyBucketAssigner =
168-
new StickyBucketAssigner(PhysicalTablePath.of(DATA1_TABLE_PATH));
168+
new StickyBucketAssigner(PhysicalTablePath.of(DATA1_TABLE_PATH), 3);
169169
int bucketId = stickyBucketAssigner.assignBucket(cluster);
170170
Queue<Integer> bucketIds = new ConcurrentLinkedQueue<>();
171171
Thread[] threads = new Thread[100];

fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,6 @@ public TablePath getTablePathOrElseThrow(long tableId) {
173173
+ " in cluster"));
174174
}
175175

176-
public int getBucketCount(TablePath tablePath) {
177-
return tableInfoByPath.get(tablePath).getNumBuckets();
178-
}
179-
180176
/** Get the bucket location for this table-bucket. */
181177
public Optional<BucketLocation> getBucketLocation(TableBucket tableBucket) {
182178
return Optional.ofNullable(availableLocationByBucket.get(tableBucket));

0 commit comments

Comments
 (0)