Skip to content

Commit 6b487a7

Browse files
authored
[server] Change the default value of 'coordinator.io-pool.size' from 1 to 10 (#617)
1 parent 4a91a2c commit 6b487a7

File tree

7 files changed

+64
-69
lines changed

7 files changed

+64
-69
lines changed

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,12 +226,12 @@ public class ConfigOptions {
226226
public static final ConfigOption<Integer> COORDINATOR_IO_POOL_SIZE =
227227
key("coordinator.io-pool.size")
228228
.intType()
229-
.defaultValue(1)
229+
.defaultValue(10)
230230
.withDescription(
231231
"The size of the IO thread pool to run blocking operations for coordinator server. "
232232
+ "This includes discard unnecessary snapshot files. "
233233
+ "Increase this value if you experience slow unnecessary snapshot files clean. "
234-
+ "The default value is 1.");
234+
+ "The default value is 10.");
235235

236236
// ------------------------------------------------------------------------
237237
// ConfigOptions for Tablet Server

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,6 @@
8282
import com.alibaba.fluss.server.zk.data.TabletServerRegistration;
8383
import com.alibaba.fluss.server.zk.data.ZkData.PartitionIdsZNode;
8484
import com.alibaba.fluss.server.zk.data.ZkData.TableIdsZNode;
85-
import com.alibaba.fluss.utils.ExecutorUtils;
86-
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
8785
import com.alibaba.fluss.utils.types.Tuple2;
8886

8987
import org.slf4j.Logger;
@@ -101,8 +99,6 @@
10199
import java.util.Set;
102100
import java.util.concurrent.CompletableFuture;
103101
import java.util.concurrent.ExecutorService;
104-
import java.util.concurrent.Executors;
105-
import java.util.concurrent.TimeUnit;
106102
import java.util.stream.Collectors;
107103

108104
import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.OfflineBucket;
@@ -111,7 +107,6 @@
111107
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
112108
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted;
113109
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionSuccessful;
114-
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
115110
import static com.alibaba.fluss.utils.concurrent.FutureUtils.completeFromCallable;
116111

117112
/** An implementation for {@link EventProcessor}. */
@@ -137,7 +132,6 @@ public class CoordinatorEventProcessor implements EventProcessor {
137132
private final String internalListenerName;
138133

139134
private final CompletedSnapshotStoreManager completedSnapshotStoreManager;
140-
private final ExecutorService ioExecutor;
141135

142136
// in normal case, it won't be null, but from I can see, it'll only be null in unit test
143137
// since the we won't register a coordinator node in zk.
@@ -156,15 +150,17 @@ public CoordinatorEventProcessor(
156150
CoordinatorChannelManager coordinatorChannelManager,
157151
AutoPartitionManager autoPartitionManager,
158152
CoordinatorMetricGroup coordinatorMetricGroup,
159-
Configuration conf) {
153+
Configuration conf,
154+
ExecutorService ioExecutor) {
160155
this(
161156
zooKeeperClient,
162157
serverMetadataCache,
163158
coordinatorChannelManager,
164159
new CoordinatorContext(),
165160
autoPartitionManager,
166161
coordinatorMetricGroup,
167-
conf);
162+
conf,
163+
ioExecutor);
168164
}
169165

170166
public CoordinatorEventProcessor(
@@ -174,7 +170,8 @@ public CoordinatorEventProcessor(
174170
CoordinatorContext coordinatorContext,
175171
AutoPartitionManager autoPartitionManager,
176172
CoordinatorMetricGroup coordinatorMetricGroup,
177-
Configuration conf) {
173+
Configuration conf,
174+
ExecutorService ioExecutor) {
178175
this.zooKeeperClient = zooKeeperClient;
179176
this.serverMetadataCache = serverMetadataCache;
180177
this.coordinatorChannelManager = coordinatorChannelManager;
@@ -193,11 +190,6 @@ public CoordinatorEventProcessor(
193190
zooKeeperClient);
194191
this.metadataManager = new MetadataManager(zooKeeperClient, conf);
195192

196-
int ioExecutorPoolSize = conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE);
197-
checkArgument(ioExecutorPoolSize > 0, "ioExecutorPoolSize must be positive");
198-
this.ioExecutor =
199-
Executors.newFixedThreadPool(
200-
ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io"));
201193
this.tableManager =
202194
new TableManager(
203195
metadataManager,
@@ -446,8 +438,6 @@ private void onShutdown() {
446438
// then stop watchers
447439
tableChangeWatcher.stop();
448440
tabletServerChangeWatcher.stop();
449-
// shutdown io executor
450-
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor);
451441
}
452442

453443
@Override

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import com.alibaba.fluss.server.zk.ZooKeeperUtils;
3939
import com.alibaba.fluss.server.zk.data.CoordinatorAddress;
4040
import com.alibaba.fluss.utils.ExceptionUtils;
41+
import com.alibaba.fluss.utils.ExecutorUtils;
42+
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
4143
import com.alibaba.fluss.utils.concurrent.FutureUtils;
4244

4345
import org.slf4j.Logger;
@@ -50,6 +52,9 @@
5052
import java.util.List;
5153
import java.util.UUID;
5254
import java.util.concurrent.CompletableFuture;
55+
import java.util.concurrent.ExecutorService;
56+
import java.util.concurrent.Executors;
57+
import java.util.concurrent.TimeUnit;
5358
import java.util.concurrent.atomic.AtomicBoolean;
5459

5560
/**
@@ -111,6 +116,9 @@ public class CoordinatorServer extends ServerBase {
111116
@GuardedBy("lock")
112117
private AutoPartitionManager autoPartitionManager;
113118

119+
@GuardedBy("lock")
120+
private ExecutorService ioExecutor;
121+
114122
public CoordinatorServer(Configuration conf) {
115123
super(conf);
116124
validateConfigs(conf);
@@ -175,6 +183,11 @@ protected void startServices() throws Exception {
175183
new AutoPartitionManager(metadataCache, metadataManager, conf);
176184
autoPartitionManager.start();
177185

186+
int ioExecutorPoolSize = conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE);
187+
this.ioExecutor =
188+
Executors.newFixedThreadPool(
189+
ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io"));
190+
178191
// start coordinator event processor after we register coordinator leader to zk
179192
// so that the event processor can get the coordinator leader node from zk during start
180193
// up.
@@ -187,7 +200,8 @@ protected void startServices() throws Exception {
187200
coordinatorChannelManager,
188201
autoPartitionManager,
189202
serverMetricGroup,
190-
conf);
203+
conf,
204+
ioExecutor);
191205
coordinatorEventProcessor.startup();
192206

193207
createDefaultDatabase();
@@ -268,6 +282,15 @@ CompletableFuture<Void> stopServices() {
268282
exception = ExceptionUtils.firstOrSuppressed(t, exception);
269283
}
270284

285+
try {
286+
if (ioExecutor != null) {
287+
// shutdown io executor
288+
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor);
289+
}
290+
} catch (Throwable t) {
291+
exception = ExceptionUtils.firstOrSuppressed(t, exception);
292+
}
293+
271294
try {
272295
if (coordinatorEventProcessor != null) {
273296
coordinatorEventProcessor.shutdown();

fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,13 @@ private static void validateConfigs(Configuration conf) {
400400
ConfigOptions.TABLET_SERVER_ID.key()));
401401
}
402402

403+
if (conf.get(ConfigOptions.BACKGROUND_THREADS) < 1) {
404+
throw new IllegalConfigurationException(
405+
String.format(
406+
"Invalid configuration for %s, it must be greater than or equal 1.",
407+
ConfigOptions.BACKGROUND_THREADS.key()));
408+
}
409+
403410
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
404411
throw new IllegalConfigurationException(
405412
String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR));

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java

Lines changed: 19 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import com.alibaba.fluss.testutils.common.AllCallbackWrapper;
5757
import com.alibaba.fluss.types.DataTypes;
5858
import com.alibaba.fluss.utils.ExceptionUtils;
59+
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
5960
import com.alibaba.fluss.utils.types.Tuple2;
6061

6162
import org.junit.jupiter.api.AfterEach;
@@ -75,6 +76,7 @@
7576
import java.util.Optional;
7677
import java.util.Set;
7778
import java.util.concurrent.CompletableFuture;
79+
import java.util.concurrent.Executors;
7880
import java.util.concurrent.TimeUnit;
7981
import java.util.function.Consumer;
8082
import java.util.function.Function;
@@ -152,14 +154,7 @@ void beforeEach() throws IOException {
152154
new AutoPartitionManager(serverMetadataCache, metadataManager, new Configuration());
153155
Configuration conf = new Configuration();
154156
conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data");
155-
eventProcessor =
156-
new CoordinatorEventProcessor(
157-
zookeeperClient,
158-
serverMetadataCache,
159-
testCoordinatorChannelManager,
160-
autoPartitionManager,
161-
TestingMetricGroups.COORDINATOR_METRICS,
162-
new Configuration());
157+
eventProcessor = buildCoordinatorEventProcessor();
163158
eventProcessor.startup();
164159
metadataManager.createDatabase(
165160
defaultDatabase, DatabaseDescriptor.builder().build(), false);
@@ -221,14 +216,7 @@ void testCreateAndDropTable() throws Exception {
221216
metadataManager.dropTable(t2, false);
222217

223218
// start the coordinator
224-
eventProcessor =
225-
new CoordinatorEventProcessor(
226-
zookeeperClient,
227-
serverMetadataCache,
228-
testCoordinatorChannelManager,
229-
autoPartitionManager,
230-
TestingMetricGroups.COORDINATOR_METRICS,
231-
new Configuration());
219+
eventProcessor = buildCoordinatorEventProcessor();
232220
initCoordinatorChannel();
233221
eventProcessor.startup();
234222
// make sure the table can still be deleted successfully
@@ -386,14 +374,7 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception {
386374

387375
// let's restart to check everything is ok
388376
eventProcessor.shutdown();
389-
eventProcessor =
390-
new CoordinatorEventProcessor(
391-
zookeeperClient,
392-
serverMetadataCache,
393-
testCoordinatorChannelManager,
394-
autoPartitionManager,
395-
TestingMetricGroups.COORDINATOR_METRICS,
396-
new Configuration());
377+
eventProcessor = buildCoordinatorEventProcessor();
397378

398379
// in this test case, so make requests to gateway should always be
399380
// successful for when start up, it will send request to tablet servers
@@ -431,14 +412,7 @@ void testRestartTriggerReplicaToOffline() throws Exception {
431412
// let's restart
432413
initCoordinatorChannel();
433414
eventProcessor.shutdown();
434-
eventProcessor =
435-
new CoordinatorEventProcessor(
436-
zookeeperClient,
437-
serverMetadataCache,
438-
testCoordinatorChannelManager,
439-
autoPartitionManager,
440-
TestingMetricGroups.COORDINATOR_METRICS,
441-
new Configuration());
415+
eventProcessor = buildCoordinatorEventProcessor();
442416
int failedServer = 0;
443417
initCoordinatorChannel(failedServer);
444418
eventProcessor.startup();
@@ -602,14 +576,7 @@ void testCreateAndDropPartition() throws Exception {
602576
metadataManager.dropTable(tablePath, false);
603577

604578
// start the coordinator
605-
eventProcessor =
606-
new CoordinatorEventProcessor(
607-
zookeeperClient,
608-
serverMetadataCache,
609-
testCoordinatorChannelManager,
610-
autoPartitionManager,
611-
TestingMetricGroups.COORDINATOR_METRICS,
612-
new Configuration());
579+
eventProcessor = buildCoordinatorEventProcessor();
613580
initCoordinatorChannel();
614581
eventProcessor.startup();
615582
verifyPartitionDropped(tableId, partition2Id);
@@ -656,14 +623,7 @@ void testRestartResumeDropPartition() throws Exception {
656623
zookeeperClient.deletePartition(tablePath, partition2Name);
657624

658625
// start the coordinator
659-
eventProcessor =
660-
new CoordinatorEventProcessor(
661-
zookeeperClient,
662-
serverMetadataCache,
663-
testCoordinatorChannelManager,
664-
autoPartitionManager,
665-
TestingMetricGroups.COORDINATOR_METRICS,
666-
new Configuration());
626+
eventProcessor = buildCoordinatorEventProcessor();
667627
initCoordinatorChannel();
668628
eventProcessor.startup();
669629

@@ -677,6 +637,17 @@ void testRestartResumeDropPartition() throws Exception {
677637
replicationFactor);
678638
}
679639

640+
private CoordinatorEventProcessor buildCoordinatorEventProcessor() {
641+
return new CoordinatorEventProcessor(
642+
zookeeperClient,
643+
serverMetadataCache,
644+
testCoordinatorChannelManager,
645+
autoPartitionManager,
646+
TestingMetricGroups.COORDINATOR_METRICS,
647+
new Configuration(),
648+
Executors.newFixedThreadPool(1, new ExecutorThreadFactory("test-coordinator-io")));
649+
}
650+
680651
private void initCoordinatorChannel() throws Exception {
681652
makeSendLeaderAndStopRequestAlwaysSuccess(
682653
testCoordinatorChannelManager,

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
4141
import com.alibaba.fluss.shaded.guava32.com.google.common.collect.Sets;
4242
import com.alibaba.fluss.testutils.common.AllCallbackWrapper;
43+
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
4344

4445
import org.junit.jupiter.api.BeforeAll;
4546
import org.junit.jupiter.api.BeforeEach;
@@ -51,6 +52,7 @@
5152
import java.util.Arrays;
5253
import java.util.Collections;
5354
import java.util.Optional;
55+
import java.util.concurrent.Executors;
5456

5557
import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.NewBucket;
5658
import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.NonExistentBucket;
@@ -234,7 +236,9 @@ void testStateChangeToOnline() throws Exception {
234236
coordinatorContext,
235237
autoPartitionManager,
236238
TestingMetricGroups.COORDINATOR_METRICS,
237-
new Configuration());
239+
new Configuration(),
240+
Executors.newFixedThreadPool(
241+
1, new ExecutorThreadFactory("test-coordinator-io")));
238242
CoordinatorEventManager eventManager =
239243
new CoordinatorEventManager(coordinatorEventProcessor);
240244
coordinatorRequestBatch =

website/docs/maintenance/configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ during the Fluss cluster working.
5757
|--------------------------|---------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
5858
| coordinator.host | String | (None) | The config parameter defining the network address to connect to for communication with the coordinator server. If the coordinator server is used as a bootstrap server (discover all the servers in the cluster), the value of this config option should be a static hostname or address. |
5959
| coordinator.port | String | 9123 | The config parameter defining the network port to connect to for communication with the coordinator server. Like 'coordinator.host', if the coordinator server is used as a bootstrap server (discover all the servers in the cluster), the value of this config option should be a static port. Otherwise, the value can be set to "0" for a dynamic service name resolution. The value accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. |
60-
| coordinator.io-pool.size | Integer | 1 | The size of the IO thread pool to run blocking operations for coordinator server. This includes discard unnecessary snapshot files. Increase this value if you experience slow unnecessary snapshot files clean. The default value is 1. |
60+
| coordinator.io-pool.size | Integer | 10 | The size of the IO thread pool to run blocking operations for coordinator server. This includes discard unnecessary snapshot files. Increase this value if you experience slow unnecessary snapshot files clean. The default value is 10. |
6161
6262
## TabletServer
6363

0 commit comments

Comments
 (0)