Skip to content

Conversation

@caozhen1937
Copy link
Contributor

Purpose

Linked issue: close #1993

Brief change log

Tests

API and Format

Documentation

@caozhen1937 caozhen1937 force-pushed the FLUSS_ISSUE_1993 branch 2 times, most recently from 96a8077 to 0894e8a Compare December 8, 2025 15:15
Copy link
Contributor

@swuferhong swuferhong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contributions. Overall, LGTM. However, there's one scenario we need to restrict—this is actually the reason this issue was raised. Specifically, when auto-partitioning is enabled, we should enforce a limit based on retain partitions * bucket num per partition; otherwise, the total buckets could end up with an excessively large number of retained partitions.

maxBucketNum,
dynamicConfigManager.describeConfigs(),
tablePath.getDatabaseName());
validateTableDescriptor(tableToCreate, maxBucketNum, dbLevelMaxBucket);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need to pass the value returned by DatabaseLimitResolver.resolveMaxBucketForDb into validateTableDescriptor, so the separate maxBucketNum parameter can be removed. Additionally, could we rename the return value of DatabaseLimitResolver.resolveMaxBucketForDb to maxBucketNum for clarity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swuferhong maxBucketNum is still used to represent the maximum limit of the cluster. I will use two variables to distinguish them: maxBucketNumOfDb and maxBucketNumOfCluster. How do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swuferhong maxBucketNum is still used to represent the maximum limit of the cluster. I will use two variables to distinguish them: maxBucketNumOfDb and maxBucketNumOfCluster. How do you think?

What concerns me most is the implementation. Theoretically, the cluster-level logic should be holistic—when creating a table, it should first check how many buckets currently exist in the cluster and then determine whether the table can be created. However, the current cluster-level implementation actually checks individual tables during creation.

I think it's reasonable to separate these two aspects, and future expansion maybe consider for the cluster level limit.

@swuferhong
Copy link
Contributor

Hi, @caozhen1937 LGTM now. Could you add a test in Flink side call as procedure?

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @caozhen1937 for the contribution! The PR addresses an important use case, but I have a few concerns that we should address :

  1. Database-level bucket limit logic is incomplete
    Currently, the check only validates whether a single table exceeds a per-table bucket limit. However, the intended requirement is to enforce a total bucket quota across all tables within a database.
    To support this, we should maintain a database → totalBucketCount mapping in CoordinatorContext. This state must be:

    • Updated on table/partition creation and deletion
    • Don't need to persist the state as it can be restored from table bucket information
  2. Backward compatibility must be preserved
    The PR removes the existing max.bucket.num configuration, which breaks compatibility. This should be reverted. I found this configuration name is quite confused with the configs introduced in this PR, so we can rename the old config to clarify:

    • max.bucket.numbucket.table.default-limit
    • max.partition.numpartition.table.default-limit
  3. Configuration key design
    Using ad-hoc, non-deterministic keys like database.limit.xxx.max.bucket.num makes documentation and validation difficult.
    A cleaner approach is to use:

    • A single map-style config: bucket.database.limits = db1:1000,db2:3000
    • A fallback default: bucket.database.default-limit = 500
      We can use alter config with APPEND/SUBTRACT alter type to update or remove per-database limit.
  4. Missing prerequisites for release
    Two additional issues should be created and completed before this feature is released:

    • Implement a CALL procedure in Flink SQL to dynamically alter cluster-level configs (e.g., database bucket limits)
    • Add comprehensive documentation covering:
      • Configuration semantics
      • Default vs. per-database limits
      • How quota enforcement works

What do you think about this @caozhen1937 @swuferhong ?

Thanks again for the great work. Looking forward to the updates!


assertThatThrownBy(() -> admin.createTable(tablePath, tooManyBuckets, false).get())
.cause()
.isInstanceOf(TooManyBucketsException.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert the message as well, the message should be clear to user the given database has exceed the allowed number of buckets.

tablePath, newPartitionSpec("age", "3"), false)
.get())
.cause()
.isInstanceOf(TooManyBucketsException.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment on lines +727 to +739
int maxBucketNumOfDb =
DatabaseLimitResolver.resolveMaxBucketForDb(
maxBucketNumOfCluster,
dynamicConfigManager.describeConfigs(),
tablePath.getDatabaseName());
if (totalBuckets > maxBucketNumOfDb) {
throw new TooManyBucketsException(
String.format(
"Adding partition '%s' would result in %d total buckets for table %s, exceeding the maximum of %d buckets.",
"Adding partition '%s' would result in %d total buckets for table %s, exceeding the database-level maximum of %d buckets.",
partition.getPartitionName(),
totalBuckets,
tablePath,
maxBucketNum));
maxBucketNumOfDb));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we simply drops the support of maxBucketNumOfCluster or maxBucketNum. This is backward imcompatible, we shouldn't remove them.

@swuferhong
Copy link
Contributor

Hi, @wuchong. A small input regarding "Implement a CALL procedure in Flink SQL to dynamically alter cluster-level configs (e.g., database bucket limits)": the PR #2178 will support using a CALL procedure to dynamically set cluster-level configurations. However, I'm still testing the parameters introduced in #2178, and I'll leave a comment on the PR once testing is complete.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support maximum number of partitions and buckets at the database level

3 participants