Skip to content

Commit 4916bd3

Browse files
committed
Shutdown ExecutorService(s) in multi node pipelines (#3467)
* Shutdown ExecutorService(s) in multi node pipelines * Use only shutdownNow() * format import
1 parent 0b74c9e commit 4916bd3

File tree

2 files changed

+40
-2
lines changed

2 files changed

+40
-2
lines changed

src/main/java/redis/clients/jedis/MultiNodePipelineBase.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ public abstract class MultiNodePipelineBase implements PipelineCommands, Pipelin
5858
private final CommandObjects commandObjects;
5959
private GraphCommandObjects graphCommandObjects;
6060

61-
private final ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
62-
6361
public MultiNodePipelineBase(CommandObjects commandObjects) {
6462
pipelinedResponses = new LinkedHashMap<>();
6563
connections = new LinkedHashMap<>();
@@ -121,6 +119,8 @@ public final void sync() {
121119
}
122120
syncing = true;
123121

122+
ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
123+
124124
CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
125125
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
126126
= pipelinedResponses.entrySet().iterator();
@@ -153,6 +153,8 @@ public final void sync() {
153153
log.error("Thread is interrupted during sync.", e);
154154
}
155155

156+
executorService.shutdownNow();
157+
156158
syncing = false;
157159
}
158160

src/test/java/redis/clients/jedis/ClusterPipeliningTest.java

+36
Original file line numberDiff line numberDiff line change
@@ -1051,4 +1051,40 @@ public void transaction() {
10511051
assertThrows(UnsupportedOperationException.class, () -> cluster.multi());
10521052
}
10531053
}
1054+
1055+
@Test(timeout = 10_000L)
1056+
public void multiple() {
1057+
final int maxTotal = 100;
1058+
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
1059+
poolConfig.setMaxTotal(maxTotal);
1060+
try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG, 5, poolConfig)) {
1061+
for (int i = 0; i < maxTotal; i++) {
1062+
assertThreadsCount();
1063+
String s = Integer.toString(i);
1064+
try (ClusterPipeline pipeline = cluster.pipelined()) {
1065+
pipeline.set(s, s);
1066+
pipeline.sync();
1067+
}
1068+
assertThreadsCount();
1069+
}
1070+
}
1071+
}
1072+
1073+
private static void assertThreadsCount() {
1074+
// Get the root thread group
1075+
final ThreadGroup rootGroup = Thread.currentThread().getThreadGroup().getParent();
1076+
1077+
// Create a buffer to store the thread information
1078+
final Thread[] threads = new Thread[rootGroup.activeCount()];
1079+
1080+
// Enumerate all threads into the buffer
1081+
rootGroup.enumerate(threads);
1082+
1083+
// Assert information about threads
1084+
final int count = (int) Arrays.stream(threads)
1085+
.filter(thread -> thread != null && thread.getName() != null
1086+
&& thread.getName().startsWith("pool-"))
1087+
.count();
1088+
assertTrue(count < 9);
1089+
}
10541090
}

0 commit comments

Comments
 (0)