Skip to content

Commit 735edb0

Browse files
authored
[server] Introduce default max bucket number for a table (#811)
1 parent bd9e1c4 commit 735edb0

File tree

12 files changed

+270
-12
lines changed

12 files changed

+270
-12
lines changed

fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ private static Configuration initConfig() {
116116
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("1mb"));
117117
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));
118118
conf.set(ConfigOptions.MAX_PARTITION_NUM, 10);
119+
conf.set(ConfigOptions.MAX_BUCKET_NUM, 30);
119120
return conf;
120121
}
121122

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.alibaba.fluss.exception.SchemaNotExistException;
4040
import com.alibaba.fluss.exception.TableNotExistException;
4141
import com.alibaba.fluss.exception.TableNotPartitionedException;
42+
import com.alibaba.fluss.exception.TooManyBucketsException;
4243
import com.alibaba.fluss.exception.TooManyPartitionsException;
4344
import com.alibaba.fluss.fs.FsPath;
4445
import com.alibaba.fluss.fs.FsPathAndFileName;
@@ -869,4 +870,74 @@ private List<FsPathAndFileName> toFsPathAndFileNames(KvSnapshotHandle kvSnapshot
869870
kvFileHandleAndLocalPath.getLocalPath()))
870871
.collect(Collectors.toList());
871872
}
873+
874+
/**
875+
* Test that creating a partitioned table with bucket count exceeding the maximum throws
876+
* TooManyBucketsException.
877+
*/
878+
@Test
879+
public void testAddTooManyBuckets() throws Exception {
880+
// Already set low maximum bucket limit to 30 for this test in ClientToServerITCaseBase
881+
String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
882+
TableDescriptor partitionedTable =
883+
TableDescriptor.builder()
884+
.schema(
885+
Schema.newBuilder()
886+
.column("id", DataTypes.STRING())
887+
.column("name", DataTypes.STRING())
888+
.column("age", DataTypes.STRING())
889+
.build())
890+
.distributedBy(10, "id") // 10 buckets per partition
891+
.partitionedBy("age")
892+
.build();
893+
TablePath tablePath = TablePath.of(dbName, "test_add_too_many_buckets_table");
894+
admin.createTable(tablePath, partitionedTable, true).get();
895+
896+
// Add 3 partitions (3 * 10 = 30 buckets, which is the limit)
897+
for (int i = 0; i < 3; i++) {
898+
admin.createPartition(tablePath, newPartitionSpec("age", String.valueOf(i)), false)
899+
.get();
900+
}
901+
902+
// Try to add one more partition, exceeding the bucket limit (4 * 10 > 30)
903+
assertThatThrownBy(
904+
() ->
905+
admin.createPartition(
906+
tablePath, newPartitionSpec("age", "4"), false)
907+
.get())
908+
.cause()
909+
.isInstanceOf(TooManyBucketsException.class)
910+
.hasMessageContaining("exceeding the maximum of 30 buckets");
911+
}
912+
913+
/**
914+
* Test that creating a non-partitioned table with bucket count exceeding the maximum throws
915+
* TooManyBucketsException.
916+
*/
917+
@Test
918+
public void testBucketLimitForNonPartitionedTable() throws Exception {
919+
// Set a low maximum bucket limit for this test
920+
// (Assuming the configuration is already set to 30 in ClientToServerITCaseBase)
921+
String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
922+
923+
// Create a non-partitioned table with 40 buckets (exceeding limit of 30)
924+
TableDescriptor nonPartitionedTable =
925+
TableDescriptor.builder()
926+
.schema(
927+
Schema.newBuilder()
928+
.column("id", DataTypes.STRING())
929+
.column("name", DataTypes.STRING())
930+
.column("value", DataTypes.STRING())
931+
.build())
932+
.distributedBy(40, "id") // 40 buckets exceeds the limit of 30
933+
.build(); // No partitionedBy call makes this non-partitioned
934+
935+
TablePath tablePath = TablePath.of(dbName, "test_too_many_buckets_non_partitioned");
936+
937+
// Creating this table should throw TooManyBucketsException
938+
assertThatThrownBy(() -> admin.createTable(tablePath, nonPartitionedTable, false).get())
939+
.cause()
940+
.isInstanceOf(TooManyBucketsException.class)
941+
.hasMessageContaining("exceeds the maximum limit");
942+
}
872943
}

fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/LogScannerITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ void testPoll() throws Exception {
8787

8888
@Test
8989
void testPollWhileCreateTableNotReady() throws Exception {
90-
// create one table with 100 buckets.
91-
int bucketNumber = 100;
90+
// create one table with 30 buckets.
91+
int bucketNumber = 30;
9292
TableDescriptor tableDescriptor =
9393
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(bucketNumber).build();
9494
createTable(DATA1_TABLE_PATH, tableDescriptor, false);

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,14 @@ public class ConfigOptions {
169169
+ "and each super user should be specified in the format 'principal_type:principal_name', e.g., 'User:admin;User:bob'. "
170170
+ "This configuration is critical for defining administrative privileges in the system.");
171171

172+
public static final ConfigOption<Integer> MAX_BUCKET_NUM =
173+
key("max.bucket.num")
174+
.intType()
175+
.defaultValue(128000)
176+
.withDescription(
177+
"The maximum number of buckets that can be created for a table."
178+
+ "The default value is 128000");
179+
172180
// ------------------------------------------------------------------------
173181
// ConfigOptions for Coordinator Server
174182
// ------------------------------------------------------------------------
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.exception;
18+
19+
import com.alibaba.fluss.annotation.PublicEvolving;
20+
21+
/**
22+
* This exception is thrown if the number of table buckets is exceed max.bucket.num.
23+
*
24+
* @since 0.7
25+
*/
26+
@PublicEvolving
27+
public class TooManyBucketsException extends ApiException {
28+
29+
private static final long serialVersionUID = 1L;
30+
31+
public TooManyBucketsException(String message) {
32+
super(message);
33+
}
34+
35+
public TooManyBucketsException(String message, Throwable cause) {
36+
super(message, cause);
37+
}
38+
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import com.alibaba.fluss.exception.TableNotExistException;
6262
import com.alibaba.fluss.exception.TableNotPartitionedException;
6363
import com.alibaba.fluss.exception.TimeoutException;
64+
import com.alibaba.fluss.exception.TooManyBucketsException;
6465
import com.alibaba.fluss.exception.TooManyPartitionsException;
6566
import com.alibaba.fluss.exception.UnknownServerException;
6667
import com.alibaba.fluss.exception.UnknownTableOrBucketException;
@@ -196,7 +197,9 @@ public enum Errors {
196197
45, "Exceed the maximum number of partitions.", TooManyPartitionsException::new),
197198
AUTHENTICATE_EXCEPTION(46, "Authentication failed.", AuthenticationException::new),
198199
SECURITY_DISABLED_EXCEPTION(47, "Security is disabled.", SecurityDisabledException::new),
199-
AUTHORIZATION_EXCEPTION(48, "Authorization failed", AuthorizationException::new);
200+
AUTHORIZATION_EXCEPTION(48, "Authorization failed", AuthorizationException::new),
201+
BUCKET_MAX_NUM_EXCEPTION(
202+
49, "Exceed the maximum number of buckets", TooManyBucketsException::new);
200203

201204
private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
202205

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.alibaba.fluss.config.Configuration;
2424
import com.alibaba.fluss.exception.PartitionAlreadyExistsException;
2525
import com.alibaba.fluss.exception.PartitionNotExistException;
26+
import com.alibaba.fluss.exception.TooManyBucketsException;
2627
import com.alibaba.fluss.exception.TooManyPartitionsException;
2728
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
2829
import com.alibaba.fluss.metadata.TableInfo;
@@ -319,6 +320,12 @@ private void createPartitions(
319320
+ "because exceed the maximum number of partitions.",
320321
partition,
321322
tablePath);
323+
} catch (TooManyBucketsException t) {
324+
LOG.warn(
325+
"Auto partitioning skip to create partition {} for table [{}], "
326+
+ "because exceed the maximum number of buckets.",
327+
partition,
328+
tablePath);
322329
} catch (Exception e) {
323330
LOG.error(
324331
"Auto partitioning failed to create partition {} for table [{}].",

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@
100100
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeCreateAclsResponse;
101101
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeDropAclsResponse;
102102
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
103-
import static com.alibaba.fluss.server.utils.TableDescriptorValidation.validateTableDescriptor;
104103
import static com.alibaba.fluss.utils.PartitionUtils.validatePartitionSpec;
105104
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
106105
import static com.alibaba.fluss.utils.Preconditions.checkState;
@@ -235,9 +234,8 @@ public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest req
235234
TableAssignmentUtils.generateAssignment(bucketCount, replicaFactor, servers);
236235
}
237236

238-
// validate table properties before creating table
239-
validateTableDescriptor(tableDescriptor);
240-
237+
// TODO: should tolerate if the lake exist but matches our schema. This ensures eventually
238+
// consistent by idempotently creating the table multiple times. See #846
241239
// before create table in fluss, we may create in lake
242240
if (isDataLakeEnabled(tableDescriptor)) {
243241
try {

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.alibaba.fluss.exception.TableAlreadyExistException;
2929
import com.alibaba.fluss.exception.TableNotExistException;
3030
import com.alibaba.fluss.exception.TableNotPartitionedException;
31+
import com.alibaba.fluss.exception.TooManyBucketsException;
3132
import com.alibaba.fluss.exception.TooManyPartitionsException;
3233
import com.alibaba.fluss.metadata.DatabaseDescriptor;
3334
import com.alibaba.fluss.metadata.DatabaseInfo;
@@ -58,6 +59,8 @@
5859
import java.util.Set;
5960
import java.util.concurrent.Callable;
6061

62+
import static com.alibaba.fluss.server.utils.TableDescriptorValidation.validateTableDescriptor;
63+
6164
/** A manager for metadata. */
6265
public class MetadataManager {
6366

@@ -66,6 +69,7 @@ public class MetadataManager {
6669
private final ZooKeeperClient zookeeperClient;
6770
private @Nullable final Map<String, String> defaultTableLakeOptions;
6871
private final int maxPartitionNum;
72+
private final int maxBucketNum;
6973

7074
/**
7175
* Creates a new metadata manager.
@@ -76,7 +80,8 @@ public class MetadataManager {
7680
public MetadataManager(ZooKeeperClient zookeeperClient, Configuration conf) {
7781
this.zookeeperClient = zookeeperClient;
7882
this.defaultTableLakeOptions = LakeStorageUtils.generateDefaultTableLakeOptions(conf);
79-
maxPartitionNum = conf.get(ConfigOptions.MAX_PARTITION_NUM);
83+
this.maxPartitionNum = conf.get(ConfigOptions.MAX_PARTITION_NUM);
84+
this.maxBucketNum = conf.get(ConfigOptions.MAX_BUCKET_NUM);
8085
}
8186

8287
public void createDatabase(
@@ -227,6 +232,9 @@ public long createTable(
227232
@Nullable TableAssignment tableAssignment,
228233
boolean ignoreIfExists)
229234
throws TableAlreadyExistException, DatabaseNotExistException {
235+
// validate table properties before creating table
236+
validateTableDescriptor(tableToCreate, maxBucketNum);
237+
230238
if (!databaseExists(tablePath.getDatabaseName())) {
231239
throw new DatabaseNotExistException(
232240
"Database " + tablePath.getDatabaseName() + " does not exist.");
@@ -364,8 +372,9 @@ public void createPartition(
364372
partition.getPartitionQualifiedName(), tablePath));
365373
}
366374

375+
final int partitionNumber;
367376
try {
368-
int partitionNumber = zookeeperClient.getPartitionNumber(tablePath);
377+
partitionNumber = zookeeperClient.getPartitionNumber(tablePath);
369378
if (partitionNumber + 1 > maxPartitionNum) {
370379
throw new TooManyPartitionsException(
371380
String.format(
@@ -382,6 +391,26 @@ public void createPartition(
382391
e);
383392
}
384393

394+
try {
395+
int bucketCount = partitionAssignment.getBucketAssignments().size();
396+
// currently, every partition has the same bucket count
397+
int totalBuckets = bucketCount * (partitionNumber + 1);
398+
if (totalBuckets > maxBucketNum) {
399+
throw new TooManyBucketsException(
400+
String.format(
401+
"Adding partition '%s' would result in %d total buckets for table %s, exceeding the maximum of %d buckets.",
402+
partition.getPartitionName(),
403+
totalBuckets,
404+
tablePath,
405+
maxBucketNum));
406+
}
407+
} catch (TooManyBucketsException e) {
408+
throw e;
409+
} catch (Exception e) {
410+
throw new FlussRuntimeException(
411+
String.format("Failed to check total bucket count for table %s", tablePath), e);
412+
}
413+
385414
try {
386415
long partitionId = zookeeperClient.getPartitionIdAndIncrement();
387416
// register partition assignments to zk first

fluss-server/src/main/java/com/alibaba/fluss/server/utils/TableDescriptorValidation.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.alibaba.fluss.config.ReadableConfig;
2323
import com.alibaba.fluss.exception.InvalidConfigException;
2424
import com.alibaba.fluss.exception.InvalidTableException;
25+
import com.alibaba.fluss.exception.TooManyBucketsException;
2526
import com.alibaba.fluss.metadata.KvFormat;
2627
import com.alibaba.fluss.metadata.LogFormat;
2728
import com.alibaba.fluss.metadata.MergeEngineType;
@@ -42,7 +43,7 @@
4243
public class TableDescriptorValidation {
4344

4445
/** Validate table descriptor to create is valid and contain all necessary information. */
45-
public static void validateTableDescriptor(TableDescriptor tableDescriptor) {
46+
public static void validateTableDescriptor(TableDescriptor tableDescriptor, int maxBucketNum) {
4647
boolean hasPrimaryKey = tableDescriptor.getSchema().getPrimaryKey().isPresent();
4748
RowType schema = tableDescriptor.getSchema().getRowType();
4849
Configuration tableConf = Configuration.fromMap(tableDescriptor.getProperties());
@@ -62,7 +63,7 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor) {
6263
}
6364

6465
// check distribution
65-
checkDistribution(tableDescriptor);
66+
checkDistribution(tableDescriptor, maxBucketNum);
6667

6768
// check individual options
6869
checkReplicationFactor(tableConf);
@@ -73,13 +74,20 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor) {
7374
checkPartition(tableConf, tableDescriptor.getPartitionKeys(), schema);
7475
}
7576

76-
private static void checkDistribution(TableDescriptor tableDescriptor) {
77+
private static void checkDistribution(TableDescriptor tableDescriptor, int maxBucketNum) {
7778
if (!tableDescriptor.getTableDistribution().isPresent()) {
7879
throw new InvalidTableException("Table distribution is required.");
7980
}
8081
if (!tableDescriptor.getTableDistribution().get().getBucketCount().isPresent()) {
8182
throw new InvalidTableException("Bucket number must be set.");
8283
}
84+
int bucketCount = tableDescriptor.getTableDistribution().get().getBucketCount().get();
85+
if (bucketCount > maxBucketNum) {
86+
throw new TooManyBucketsException(
87+
String.format(
88+
"Bucket count %s exceeds the maximum limit %s.",
89+
bucketCount, maxBucketNum));
90+
}
8391
}
8492

8593
private static void checkReplicationFactor(Configuration tableConf) {

0 commit comments

Comments
 (0)