Skip to content

Commit e3f0c6c

Browse files
refactor code & fix document link
1 parent e5eeb03 commit e3f0c6c

File tree

10 files changed

+70
-31
lines changed

10 files changed

+70
-31
lines changed

fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ private void updateCurrentConfig(Map<String, String> newDynamicConfigs, boolean
154154

155155
// Early return if no effective changes
156156
if (effectiveChanges.isEmpty()) {
157+
LOG.info("No effective config changes detected for: {}", newDynamicConfigs);
157158
return;
158159
}
159160

@@ -166,7 +167,7 @@ private void updateCurrentConfig(Map<String, String> newDynamicConfigs, boolean
166167

167168
// Update internal state
168169
updateInternalState(newConfig, newConfigMap, newDynamicConfigs);
169-
LOG.info("Dynamic configs changed: {}", newDynamicConfigs);
170+
LOG.info("Dynamic configs changed: {}", effectiveChanges);
170171
}
171172

172173
/**
@@ -345,14 +346,17 @@ private void validateSingleConfig(String configKey, String oldValueStr, String n
345346
try {
346347
newValue = tempConfig.getOptional(configOption).get();
347348
} catch (Exception e) {
349+
String causeMessage =
350+
e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName();
348351
throw new ConfigException(
349352
String.format(
350-
"Cannot parse '%s' as %s for config '%s'",
353+
"Cannot parse '%s' as %s for config '%s': %s",
351354
newValueStr,
352355
configOption.isList()
353356
? "List<" + configOption.getClazz().getSimpleName() + ">"
354357
: configOption.getClazz().getSimpleName(),
355-
configKey),
358+
configKey,
359+
causeMessage),
356360
e);
357361
}
358362
}

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,39 @@
7474
public final class KvManager extends TabletManagerBase implements ServerReconfigurable {
7575

7676
private static final Logger LOG = LoggerFactory.getLogger(KvManager.class);
77+
78+
/**
79+
* Default global rate limiter with unlimited rate (Long.MAX_VALUE bytes per second).
80+
*
81+
* <p>This is used by RocksDBResourceContainer when no rate limiter is explicitly provided,
82+
* ensuring the API is safer and more robust by avoiding null checks throughout the code.
83+
*/
84+
private static final RateLimiter DEFAULT_RATE_LIMITER = createDefaultRateLimiter();
85+
86+
/**
87+
* Creates a default rate limiter with unlimited rate (Long.MAX_VALUE bytes per second).
88+
*
89+
* @return a default rate limiter instance
90+
*/
91+
private static RateLimiter createDefaultRateLimiter() {
92+
RocksDB.loadLibrary();
93+
// Create a rate limiter with unlimited rate (effectively no limit)
94+
// Using default refill period and fairness values
95+
return new RateLimiter(Long.MAX_VALUE);
96+
}
97+
98+
/**
99+
* Returns the default global rate limiter with unlimited rate.
100+
*
101+
* <p>This method provides access to the default rate limiter for use in
102+
* RocksDBResourceContainer when no rate limiter is explicitly provided.
103+
*
104+
* @return the default rate limiter instance
105+
*/
106+
public static RateLimiter getDefaultRateLimiter() {
107+
return DEFAULT_RATE_LIMITER;
108+
}
109+
77110
private final LogManager logManager;
78111

79112
private final TabletServerMetricGroup serverMetricGroup;

fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.config.ConfigOptions;
2323
import org.apache.fluss.config.Configuration;
2424
import org.apache.fluss.config.ReadableConfig;
25+
import org.apache.fluss.server.kv.KvManager;
2526
import org.apache.fluss.utils.FileUtils;
2627
import org.apache.fluss.utils.IOUtils;
2728

@@ -49,6 +50,8 @@
4950
import java.util.Collection;
5051
import java.util.List;
5152

53+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
54+
5255
/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache
5356
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
5457
* additional information regarding copyright ownership. */
@@ -82,18 +85,18 @@ public class RocksDBResourceContainer implements AutoCloseable {
8285

8386
@VisibleForTesting
8487
RocksDBResourceContainer() {
85-
this(new Configuration(), null, false, null);
88+
this(new Configuration(), null, false, KvManager.getDefaultRateLimiter());
8689
}
8790

8891
public RocksDBResourceContainer(ReadableConfig configuration, @Nullable File instanceBasePath) {
89-
this(configuration, instanceBasePath, false, null);
92+
this(configuration, instanceBasePath, false, KvManager.getDefaultRateLimiter());
9093
}
9194

9295
public RocksDBResourceContainer(
9396
ReadableConfig configuration,
9497
@Nullable File instanceBasePath,
9598
boolean enableStatistics) {
96-
this(configuration, instanceBasePath, enableStatistics, null);
99+
this(configuration, instanceBasePath, enableStatistics, KvManager.getDefaultRateLimiter());
97100
}
98101

99102
public RocksDBResourceContainer(
@@ -108,7 +111,8 @@ public RocksDBResourceContainer(
108111
? RocksDBKvBuilder.getInstanceRocksDBPath(instanceBasePath)
109112
: null;
110113
this.enableStatistics = enableStatistics;
111-
this.sharedRateLimiter = sharedRateLimiter;
114+
this.sharedRateLimiter =
115+
checkNotNull(sharedRateLimiter, "sharedRateLimiter must not be null");
112116

113117
this.handlesToClose = new ArrayList<>();
114118
}
@@ -130,10 +134,8 @@ public DBOptions getDbOptions() throws IOException {
130134
// add necessary default options
131135
opt = opt.setCreateIfMissing(true);
132136

133-
// set shared rate limiter if provided
134-
if (sharedRateLimiter != null) {
135-
opt.setRateLimiter(sharedRateLimiter);
136-
}
137+
// set shared rate limiter
138+
opt.setRateLimiter(sharedRateLimiter);
137139

138140
if (enableStatistics) {
139141
Statistics statistics = new Statistics();

fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ private KvTablet createKvTablet(
192192
DEFAULT_COMPRESSION,
193193
schemaGetter,
194194
tableConf.getChangelogImage(),
195-
null);
195+
KvManager.getDefaultRateLimiter());
196196
}
197197

198198
@Test

website/docs/engine-flink/ddl/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ CREATE TABLE my_part_log_table (
125125
Fluss partitioned table supports dynamic partition creation, which means you can write data into a partition without pre-creating it.
126126
You can use the `INSERT INTO` statement to write data into a partitioned table, and Fluss will automatically create the partition if it does not exist.
127127
See the [Dynamic Partitioning](table-design/data-distribution/partitioning.md#dynamic-partitioning) for more details.
128-
But you can still use the [Add Partition](engine-flink/ddl.md#add-partition) statement to manually add partitions if needed.
128+
But you can still use the [Add Partition](engine-flink/ddl/index.md#add-partition) statement to manually add partitions if needed.
129129
:::
130130

131131
#### Multi-Fields Partitioned Table

website/docs/engine-flink/getting-started.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,19 @@ For Flink's Table API, Fluss supports the following features:
2222

2323
| Feature Support | Flink | Notes |
2424
|---------------------------------------------------|-------|----------------------------------------|
25-
| [SQL Create Catalog](ddl.md#create-catalog) | ✔️ | |
26-
| [SQL Create Database](ddl.md#create-database) | ✔️ | |
27-
| [SQL Drop Database](ddl.md#drop-database) | ✔️ | |
28-
| [SQL Create Table](ddl.md#create-table) | ✔️ | |
29-
| [SQL Create Table Like](ddl.md#create-table-like) | ✔️ | |
30-
| [SQL Drop Table](ddl.md#drop-table) | ✔️ | |
31-
| [SQL Create Materialized Table](ddl.md#materialized-table) | ✔️ | Continuous refresh mode only |
32-
| [SQL Alter Materialized Table](ddl.md#alter-materialized-table) | ✔️ | Suspend/Resume support |
33-
| [SQL Drop Materialized Table](ddl.md#drop-materialized-table) | ✔️ | |
34-
| [SQL Show Partitions](ddl.md#show-partitions) | ✔️ | |
35-
| [SQL Add Partition](ddl.md#add-partition) | ✔️ | |
36-
| [SQL Drop Partition](ddl.md#drop-partition) | ✔️ | |
37-
| [Procedures](ddl.md#procedures) | ✔️ | ACL management and cluster configuration |
25+
| [SQL Create Catalog](ddl/index.md#create-catalog) | ✔️ | |
26+
| [SQL Create Database](ddl/index.md#create-database) | ✔️ | |
27+
| [SQL Drop Database](ddl/index.md#drop-database) | ✔️ | |
28+
| [SQL Create Table](ddl/index.md#create-table) | ✔️ | |
29+
| [SQL Create Table Like](ddl/index.md#create-table-like) | ✔️ | |
30+
| [SQL Drop Table](ddl/index.md#drop-table) | ✔️ | |
31+
| [SQL Create Materialized Table](ddl/index.md#materialized-table) | ✔️ | Continuous refresh mode only |
32+
| [SQL Alter Materialized Table](ddl/index.md#alter-materialized-table) | ✔️ | Suspend/Resume support |
33+
| [SQL Drop Materialized Table](ddl/index.md#drop-materialized-table) | ✔️ | |
34+
| [SQL Show Partitions](ddl/index.md#show-partitions) | ✔️ | |
35+
| [SQL Add Partition](ddl/index.md#add-partition) | ✔️ | |
36+
| [SQL Drop Partition](ddl/index.md#drop-partition) | ✔️ | |
37+
| [Procedures](ddl/index.md#procedures) | ✔️ | ACL management and cluster configuration |
3838
| [SQL Select](reads.md) | ✔️ | Support both streaming and batch mode. |
3939
| [SQL Limit](reads.md#limit-read) | ✔️ | Only for Log Table |
4040
| [SQL Insert Into](writes.md) | ✔️ | Support both streaming and batch mode. |

website/docs/engine-flink/options.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ Using `ALTER TABLE ... SET` statement to modify the table options. For example,
5757
ALTER TABLE log_table SET ('table.datalake.enable' = 'true');
5858
```
5959

60-
See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties) and [ALTER TABLE ... RESET](engine-flink/ddl.md#reset-properties) documentation.
60+
See more details about [ALTER TABLE ... SET](engine-flink/ddl/index.md#set-properties) and [ALTER TABLE ... RESET](engine-flink/ddl/index.md#reset-properties) documentation.
6161

6262
## Storage Options
6363

website/docs/maintenance/operations/updating-configs.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ The `AlterConfig` class contains three properties:
4848

4949
### Using Flink Stored Procedures
5050

51-
For certain configurations, Fluss provides convenient Flink stored procedures that can be called directly from Flink SQL. See [Procedures](../../engine-flink/ddl/procedures.md#cluster-configuration-procedures) for detailed documentation on using `get_cluster_config` and `set_cluster_config` procedures.
51+
For certain configurations, Fluss provides convenient Flink stored procedures that can be called directly from Flink SQL. See [Procedures](engine-flink/ddl/procedures.md#cluster-configuration-procedures) for detailed documentation on using `get_cluster_config` and `set_cluster_config` procedures.
5252

5353
## Updating Table Configs
5454

55-
The connector options on a table including [Storage Options](engine-flink/options.md#storage-options) can be updated dynamically by [ALTER TABLE ... SET](engine-flink/ddl.md#alter-table) statement. See the example below:
55+
The connector options on a table including [Storage Options](engine-flink/options.md#storage-options) can be updated dynamically by [ALTER TABLE ... SET](engine-flink/ddl/index.md#alter-table) statement. See the example below:
5656

5757
```sql
5858
-- Enable lakehouse storage for the given table

website/docs/table-design/data-distribution/bucketing.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ sidebar_position: 1
88
A bucketing strategy is a data distribution technique that divides table data into small pieces
99
and distributes the data to multiple hosts and services.
1010

11-
When creating a Fluss table, you can specify the number of buckets by setting `'bucket.num' = '<num>'` property for the table, see more details in [DDL](engine-flink/ddl.md).
11+
When creating a Fluss table, you can specify the number of buckets by setting `'bucket.num' = '<num>'` property for the table, see more details in [DDL](engine-flink/ddl/index.md).
1212
Currently, Fluss supports 3 bucketing strategies: **Hash Bucketing**, **Sticky Bucketing** and **Round-Robin Bucketing**.
1313
Primary-Key Tables only allow to use **Hash Bucketing**. Log Tables use **Sticky Bucketing** by default but can use other two bucketing strategies.
1414

website/docs/table-design/data-distribution/partitioning.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ sidebar_position: 2
99
In Fluss, a **Partitioned Table** organizes data based on one or more partition keys, providing a way to improve query performance and manageability for large datasets. Partitions allow the system to divide data into distinct segments, each corresponding to specific values of the partition keys.
1010

1111
For partitioned tables, Fluss supports three strategies of managing partitions.
12-
- **Manual management partitions**, user can create new partitions or drop exists partitions. Learn how to create or drop partitions please refer to [Add Partition](engine-flink/ddl.md#add-partition) and [Drop Partition](engine-flink/ddl.md#drop-partition).
12+
- **Manual management partitions**, user can create new partitions or drop exists partitions. Learn how to create or drop partitions please refer to [Add Partition](/engine-flink/ddl/index.md#add-partition) and [Drop Partition](engine-flink/ddl/index.md#drop-partition).
1313
- **Auto management partitions**, the partitions will be created based on the auto partitioning rules configured at the time of table creation, and expired partitions are automatically removed, ensuring data not expanding unlimited. See [Auto Partitioning](table-design/data-distribution/partitioning.md#auto-partitioning).
1414
- **Dynamic create partitions**, the partitions will be created automatically based on the data being written to the table. See [Dynamic Partitioning](table-design/data-distribution/partitioning.md#dynamic-partitioning).
1515

0 commit comments

Comments
 (0)