Skip to content

Commit f3a5745

Browse files
Support shared RocksDB rate limiter in Fluss
1 parent 872554b commit f3a5745

File tree

13 files changed

+608
-14
lines changed

13 files changed

+608
-14
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1524,6 +1524,16 @@ public class ConfigOptions {
15241524
"The max size of the consumed memory for RocksDB batch write, "
15251525
+ "will flush just based on item count if this config set to 0.");
15261526

1527+
public static final ConfigOption<MemorySize> KV_SHARED_RATE_LIMITER_BYTES_PER_SEC =
1528+
key("kv.rocksdb.shared-rate-limiter.bytes-per-sec")
1529+
.memoryType()
1530+
.defaultValue(MemorySize.parse("100mb"))
1531+
.withDescription(
1532+
"The shared rate limit in bytes per second for RocksDB flush and compaction operations "
1533+
+ "across all RocksDB instances in the TabletServer. "
1534+
+ "All KV tablets share a single global RateLimiter to prevent disk IO from being saturated. "
1535+
+ "The default value is `100MB/s`. Set to 0 to disable rate limiting.");
1536+
15271537
// --------------------------------------------------------------------------
15281538
// Provided configurable ColumnFamilyOptions within Fluss
15291539
// --------------------------------------------------------------------------
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.procedure;
19+
20+
import org.apache.fluss.config.ConfigOptions;
21+
import org.apache.fluss.config.MemorySize;
22+
import org.apache.fluss.config.cluster.ConfigEntry;
23+
24+
import org.apache.flink.table.annotation.ProcedureHint;
25+
import org.apache.flink.table.procedure.ProcedureContext;
26+
27+
import java.util.Collection;
28+
29+
/**
30+
* Procedure to get current RocksDB rate limiter configuration from cluster config.
31+
*
32+
* <p>Usage:
33+
*
34+
* <pre>
35+
* CALL sys.get_shared_rocksdb_rate_limiter();
36+
* </pre>
37+
*/
38+
public class GetRocksDBRateLimiterProcedure extends ProcedureBase {
39+
40+
@ProcedureHint(argument = {})
41+
public String[] call(ProcedureContext context) throws Exception {
42+
43+
try {
44+
// Get cluster configuration
45+
Collection<ConfigEntry> configs = admin.describeClusterConfigs().get();
46+
47+
// Find shared rate limiter configuration
48+
String rateLimiterKey = ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key();
49+
50+
for (ConfigEntry entry : configs) {
51+
if (entry.key().equals(rateLimiterKey)) {
52+
String value = entry.value();
53+
long bytesPerSec = Long.parseLong(value);
54+
55+
if (bytesPerSec == 0) {
56+
return new String[] {
57+
"Shared RocksDB rate limiter is disabled (0 bytes/sec)"
58+
};
59+
}
60+
61+
String source =
62+
entry.source() != null ? " [source: " + entry.source() + "]" : "";
63+
return new String[] {
64+
String.format(
65+
"Shared RocksDB rate limiter: %s%s",
66+
new MemorySize(bytesPerSec).toHumanReadableString(), source)
67+
};
68+
}
69+
}
70+
71+
// Not found, return default value
72+
MemorySize defaultValue =
73+
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.defaultValue();
74+
75+
return new String[] {
76+
String.format(
77+
"Shared RocksDB rate limiter: %s (default)",
78+
defaultValue.toHumanReadableString())
79+
};
80+
81+
} catch (Exception e) {
82+
throw new RuntimeException(
83+
"Failed to get shared RocksDB rate limiter config: " + e.getMessage(), e);
84+
}
85+
}
86+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.procedure;
19+
20+
import org.apache.fluss.config.ConfigOptions;
21+
import org.apache.fluss.config.MemorySize;
22+
import org.apache.fluss.config.cluster.ConfigEntry;
23+
24+
import org.apache.flink.table.annotation.ProcedureHint;
25+
import org.apache.flink.table.procedure.ProcedureContext;
26+
27+
import java.util.Collection;
28+
29+
/**
30+
* Procedure to get current RocksDB rate limiter configuration from cluster config.
31+
*
32+
* <p>Usage:
33+
*
34+
* <pre>
35+
* CALL sys.get_shared_rocksdb_rate_limiter();
36+
* </pre>
37+
*/
38+
public class GetSharedRocksDBRateLimiterProcedure extends ProcedureBase {
39+
40+
@ProcedureHint(argument = {})
41+
public String[] call(ProcedureContext context) throws Exception {
42+
43+
try {
44+
// Get cluster configuration
45+
Collection<ConfigEntry> configs = admin.describeClusterConfigs().get();
46+
47+
// Find shared rate limiter configuration
48+
String rateLimiterKey = ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key();
49+
50+
for (ConfigEntry entry : configs) {
51+
if (entry.key().equals(rateLimiterKey)) {
52+
String value = entry.value();
53+
long bytesPerSec = Long.parseLong(value);
54+
55+
if (bytesPerSec == 0) {
56+
return new String[] {
57+
"Shared RocksDB rate limiter is disabled (0 bytes/sec)"
58+
};
59+
}
60+
61+
String source =
62+
entry.source() != null ? " [source: " + entry.source() + "]" : "";
63+
return new String[] {
64+
String.format(
65+
"Shared RocksDB rate limiter: %s%s",
66+
new MemorySize(bytesPerSec).toHumanReadableString(), source)
67+
};
68+
}
69+
}
70+
71+
// Not found, return default value
72+
MemorySize defaultValue =
73+
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.defaultValue();
74+
75+
return new String[] {
76+
String.format(
77+
"Shared RocksDB rate limiter: %s (default)",
78+
defaultValue.toHumanReadableString())
79+
};
80+
81+
} catch (Exception e) {
82+
throw new RuntimeException(
83+
"Failed to get shared RocksDB rate limiter config: " + e.getMessage(), e);
84+
}
85+
}
86+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ private static Map<String, Class<? extends ProcedureBase>> initProcedureMap() {
6969
private enum ProcedureEnum {
7070
ADD_ACL("sys.add_acl", AddAclProcedure.class),
7171
DROP_ACL("sys.drop_acl", DropAclProcedure.class),
72-
List_ACL("sys.list_acl", ListAclProcedure.class);
72+
List_ACL("sys.list_acl", ListAclProcedure.class),
73+
SET_SHARED_ROCKSDB_RATE_LIMITER(
74+
"sys.set_shared_rocksdb_rate_limiter", SetSharedRocksDBRateLimiterProcedure.class),
75+
GET_SHARED_ROCKSDB_RATE_LIMITER(
76+
"sys.get_shared_rocksdb_rate_limiter", GetSharedRocksDBRateLimiterProcedure.class);
7377

7478
private final String path;
7579
private final Class<? extends ProcedureBase> procedureClass;
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.procedure;
19+
20+
import org.apache.fluss.config.ConfigOptions;
21+
import org.apache.fluss.config.MemorySize;
22+
import org.apache.fluss.config.cluster.AlterConfig;
23+
import org.apache.fluss.config.cluster.AlterConfigOpType;
24+
25+
import org.apache.flink.table.annotation.ArgumentHint;
26+
import org.apache.flink.table.annotation.DataTypeHint;
27+
import org.apache.flink.table.annotation.ProcedureHint;
28+
import org.apache.flink.table.procedure.ProcedureContext;
29+
30+
import java.util.Collections;
31+
32+
/**
33+
* Procedure to set RocksDB rate limiter dynamically via cluster configuration. The configuration
34+
* will be persisted in ZooKeeper and applied to all TabletServers.
35+
*
36+
* <p>Usage examples:
37+
*
38+
* <pre>
39+
* -- Set rate limiter to 200MB/s
40+
* CALL sys.set_shared_rocksdb_rate_limiter('200MB');
41+
*
42+
* -- Set rate limiter to 1GB/s
43+
* CALL sys.set_shared_rocksdb_rate_limiter('1GB');
44+
* </pre>
45+
*/
46+
public class SetSharedRocksDBRateLimiterProcedure extends ProcedureBase {
47+
48+
@ProcedureHint(
49+
argument = {@ArgumentHint(name = "bytes_per_second", type = @DataTypeHint("STRING"))})
50+
public String[] call(ProcedureContext context, String bytesPerSecondStr) throws Exception {
51+
52+
try {
53+
// Parse memory size (supports formats like "100MB", "1GB", etc.)
54+
long bytesPerSecond = MemorySize.parse(bytesPerSecondStr).getBytes();
55+
56+
// Construct configuration modification operation
57+
AlterConfig alterConfig =
58+
new AlterConfig(
59+
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
60+
String.valueOf(bytesPerSecond),
61+
AlterConfigOpType.SET);
62+
63+
// Call Admin API to modify cluster configuration
64+
admin.alterClusterConfigs(Collections.singletonList(alterConfig)).get();
65+
66+
return new String[] {
67+
String.format(
68+
"Successfully set shared RocksDB rate limiter to %s (%d bytes/sec) for all TabletServers. "
69+
+ "The configuration is persisted in ZooKeeper and will survive server restarts.",
70+
bytesPerSecondStr, bytesPerSecond)
71+
};
72+
73+
} catch (Exception e) {
74+
// Extract useful error information
75+
String errorMsg = e.getMessage();
76+
if (e.getCause() != null) {
77+
errorMsg = e.getCause().getMessage();
78+
}
79+
80+
return new String[] {"Failed to set shared RocksDB rate limiter: " + errorMsg};
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)