Skip to content

Commit a1aeae3

Browse files
refactor code
1 parent 83e7b14 commit a1aeae3

File tree

15 files changed

+540
-478
lines changed

15 files changed

+540
-478
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
import org.apache.fluss.metadata.MergeEngineType;
2929
import org.apache.fluss.utils.ArrayUtils;
3030

31+
import java.lang.reflect.Field;
3132
import java.time.Duration;
3233
import java.time.ZoneId;
3334
import java.util.Arrays;
3435
import java.util.Collections;
36+
import java.util.HashMap;
3537
import java.util.List;
3638
import java.util.Map;
3739

@@ -1579,12 +1581,13 @@ public class ConfigOptions {
15791581
public static final ConfigOption<MemorySize> KV_SHARED_RATE_LIMITER_BYTES_PER_SEC =
15801582
key("kv.rocksdb.shared-rate-limiter.bytes-per-sec")
15811583
.memoryType()
1582-
.defaultValue(MemorySize.parse("100mb"))
1584+
.defaultValue(new MemorySize(Long.MAX_VALUE))
15831585
.withDescription(
15841586
"The shared rate limit in bytes per second for RocksDB flush and compaction operations "
15851587
+ "across all RocksDB instances in the TabletServer. "
15861588
+ "All KV tablets share a single global RateLimiter to prevent disk IO from being saturated. "
1587-
+ "The default value is `100MB/s`. Set to 0 to disable rate limiting.");
1589+
+ "The RateLimiter is always enabled. The default value is Long.MAX_VALUE (effectively unlimited). "
1590+
+ "Set to a lower value (e.g., 100MB) to limit the rate.");
15881591

15891592
// --------------------------------------------------------------------------
15901593
// Provided configurable ColumnFamilyOptions within Fluss
@@ -1879,4 +1882,45 @@ public enum KvCompressionType {
18791882
LZ4,
18801883
ZSTD
18811884
}
1885+
1886+
// ------------------------------------------------------------------------
1887+
// ConfigOptions Registry and Utilities
1888+
// ------------------------------------------------------------------------
1889+
1890+
/**
1891+
* Holder class for lazy initialization of ConfigOptions registry. This ensures that the
1892+
* registry is initialized only when first accessed, and guarantees that all ConfigOption fields
1893+
* are already initialized (since static initialization happens in declaration order).
1894+
*/
1895+
private static class ConfigOptionsHolder {
1896+
private static final Map<String, ConfigOption<?>> CONFIG_OPTIONS_BY_KEY;
1897+
1898+
static {
1899+
Map<String, ConfigOption<?>> options = new HashMap<>();
1900+
Field[] fields = ConfigOptions.class.getFields();
1901+
for (Field field : fields) {
1902+
if (!ConfigOption.class.isAssignableFrom(field.getType())) {
1903+
continue;
1904+
}
1905+
try {
1906+
ConfigOption<?> configOption = (ConfigOption<?>) field.get(null);
1907+
options.put(configOption.key(), configOption);
1908+
} catch (IllegalAccessException e) {
1909+
// Ignore fields that cannot be accessed
1910+
}
1911+
}
1912+
CONFIG_OPTIONS_BY_KEY = Collections.unmodifiableMap(options);
1913+
}
1914+
}
1915+
1916+
/**
1917+
* Gets the ConfigOption for a given key.
1918+
*
1919+
* @param key the configuration key
1920+
* @return the ConfigOption if found, null otherwise
1921+
*/
1922+
@Internal
1923+
public static ConfigOption<?> getConfigOption(String key) {
1924+
return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key);
1925+
}
18821926
}

fluss-common/src/main/java/org/apache/fluss/config/cluster/ConfigValidator.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,15 @@
3939
*
4040
* <p>This interface is designed to be stateless and thread-safe. Implementations should not rely on
4141
* any mutable component state.
42+
*
43+
* <p><b>Type-safe validation:</b> The validator receives strongly-typed values that have already
44+
* been parsed and validated for basic type correctness. This avoids redundant string parsing and
45+
* allows validators to focus on business logic validation.
46+
*
47+
* @param <T> the type of the configuration value being validated
4248
*/
4349
@PublicEvolving
44-
public interface ConfigValidator {
50+
public interface ConfigValidator<T> {
4551

4652
/**
4753
* Returns the configuration key this validator monitors.
@@ -68,5 +74,5 @@ public interface ConfigValidator {
6874
* @throws ConfigException if the configuration change is invalid, with a descriptive error
6975
* message explaining why the change cannot be applied
7076
*/
71-
void validate(@Nullable String oldValue, @Nullable String newValue) throws ConfigException;
77+
void validate(@Nullable T oldValue, @Nullable T newValue) throws ConfigException;
7278
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
*
4646
* <pre>
4747
* -- Get a specific configuration
48-
* CALL sys.get_cluster_config('kv.shared-rate-limiter.bytes-per-sec');
48+
* CALL sys.get_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
4949
*
5050
* -- Get all cluster configurations
5151
* CALL sys.get_cluster_config();
@@ -86,10 +86,6 @@ private Row[] getConfigs(@Nullable String configKey) throws Exception {
8686
entry.value(),
8787
entry.source() != null ? entry.source().name() : "UNKNOWN"));
8888
}
89-
90-
if (results.isEmpty()) {
91-
return new Row[] {Row.of("No cluster configurations found", null, null)};
92-
}
9389
} else {
9490
// Find specific configuration
9591
for (ConfigEntry entry : configs) {
@@ -104,15 +100,6 @@ private Row[] getConfigs(@Nullable String configKey) throws Exception {
104100
break;
105101
}
106102
}
107-
108-
if (results.isEmpty()) {
109-
return new Row[] {
110-
Row.of(
111-
String.format("Configuration key '%s' not found", configKey),
112-
null,
113-
null)
114-
};
115-
}
116103
}
117104

118105
return results.toArray(new Row[0]);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@
4545
*
4646
* <pre>
4747
* -- Set a configuration
48-
* CALL sys.set_cluster_config('kv.shared-rate-limiter.bytes-per-sec', '200MB');
48+
* CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
4949
* CALL sys.set_cluster_config('datalake.format', 'paimon');
5050
*
5151
* -- Delete a configuration (reset to default)
52-
* CALL sys.set_cluster_config('kv.shared-rate-limiter.bytes-per-sec', NULL);
53-
* CALL sys.set_cluster_config('kv.shared-rate-limiter.bytes-per-sec', '');
52+
* CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', NULL);
53+
* CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
5454
* </pre>
5555
*
5656
* <p><b>Note:</b> Not all configurations support dynamic changes. The server will validate the

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.flink.types.Row;
3030
import org.apache.flink.util.CloseableIterator;
3131
import org.apache.flink.util.CollectionUtil;
32-
import org.junit.jupiter.api.AfterEach;
3332
import org.junit.jupiter.api.BeforeEach;
3433
import org.junit.jupiter.api.Test;
3534
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -87,31 +86,6 @@ void before() throws ExecutionException, InterruptedException {
8786
tEnv.executeSql("use catalog " + CATALOG_NAME);
8887
}
8988

90-
@AfterEach
91-
void after() throws Exception {
92-
// Clean up any dynamic config changes made during the test
93-
// to ensure tests don't affect each other
94-
if (tEnv != null) {
95-
try {
96-
// Delete dynamic configs that might have been modified during tests
97-
// This resets them to their initial static configuration values
98-
tEnv.executeSql(
99-
String.format(
100-
"Call %s.sys.set_cluster_config('%s')",
101-
CATALOG_NAME,
102-
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
103-
.await();
104-
tEnv.executeSql(
105-
String.format(
106-
"Call %s.sys.set_cluster_config('%s')",
107-
CATALOG_NAME, ConfigOptions.DATALAKE_FORMAT.key()))
108-
.await();
109-
} catch (Exception e) {
110-
// Ignore cleanup errors to avoid masking test failures
111-
}
112-
}
113-
}
114-
11589
@Test
11690
void testShowProcedures() throws Exception {
11791
try (CloseableIterator<Row> showProceduresIterator =
@@ -330,6 +304,14 @@ void testGetClusterConfig() throws Exception {
330304
.asString()
331305
.contains("Configuration key 'non.existent.config' not found");
332306
}
307+
308+
// reset cluster configs.
309+
tEnv.executeSql(
310+
String.format(
311+
"Call %s.sys.set_cluster_config('%s')",
312+
CATALOG_NAME,
313+
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
314+
.await();
333315
}
334316

335317
@Test
@@ -362,6 +344,14 @@ void testSetClusterConfig() throws Exception {
362344
assertThat(results).hasSize(1);
363345
assertThat(results.get(0).getField(1)).isEqualTo("200MB");
364346
}
347+
348+
// reset cluster configs.
349+
tEnv.executeSql(
350+
String.format(
351+
"Call %s.sys.set_cluster_config('%s')",
352+
CATALOG_NAME,
353+
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
354+
.await();
365355
}
366356

367357
@Test

0 commit comments

Comments
 (0)