Skip to content

Commit b8d3e7e

Browse files
committed
+ using the same executor for both work and admin tasks
1 parent 14c5e9c commit b8d3e7e

File tree

2 files changed

+18
-20
lines changed

2 files changed

+18
-20
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
<modelVersion>4.0.0</modelVersion>
2828
<groupId>io.github.q3769</groupId>
2929
<artifactId>conseq4j</artifactId>
30-
<version>20231102.0.20231105</version>
30+
<version>20231102.0.20240511</version>
3131
<packaging>jar</packaging>
3232
<name>conseq4j</name>
3333
<description>A Java concurrent API to sequence related tasks while concurring unrelated ones</description>

src/main/java/conseq4j/execute/ConseqExecutor.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,15 @@
3030
import java.time.Duration;
3131
import java.util.List;
3232
import java.util.Map;
33-
import java.util.concurrent.*;
33+
import java.util.concurrent.Callable;
34+
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.CompletionException;
36+
import java.util.concurrent.ConcurrentHashMap;
37+
import java.util.concurrent.ConcurrentMap;
38+
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.Executors;
40+
import java.util.concurrent.ForkJoinPool;
41+
import java.util.concurrent.Future;
3442
import javax.annotation.Nonnull;
3543
import javax.annotation.concurrent.ThreadSafe;
3644
import lombok.NonNull;
@@ -48,7 +56,6 @@ public final class ConseqExecutor implements SequentialExecutor, Terminable, Aut
4856

4957
private static final int DEFAULT_CONCURRENCY = Runtime.getRuntime().availableProcessors();
5058
private final Map<Object, CompletableFuture<?>> activeSequentialTasks = new ConcurrentHashMap<>();
51-
private final ExecutorService adminService = Executors.newSingleThreadExecutor();
5259
/**
5360
* The worker thread pool facilitates the overall async execution, independent of the submitted tasks. Any thread
5461
* from the pool can be used to execute any task, regardless of sequence keys. The pool capacity decides the overall
@@ -105,7 +112,7 @@ private static ConditionFactory awaitForever() {
105112
}
106113

107114
/**
108-
* @param command the command to run asynchronously in proper sequence
115+
* @param command the command to run asynchronously in proper sequence
109116
* @param sequenceKey the key under which this task should be sequenced
110117
* @return future result of the command, not downcast-able from the basic {@link Future} interface.
111118
* @see ConseqExecutor#submit(Callable, Object)
@@ -144,7 +151,7 @@ public CompletableFuture<Void> execute(@NonNull Runnable command, @NonNull Objec
144151
* task/stage is never added to the task work queue on the executor map and has no effect on the overall
145152
* sequential-ness of the work stage executions.
146153
*
147-
* @param task the task to be called asynchronously with proper sequence
154+
* @param task the task to be called asynchronously with proper sequence
148155
* @param sequenceKey the key under which this task should be sequenced
149156
* @return future result of the task, not downcast-able from the basic {@link Future} interface.
150157
*/
@@ -155,12 +162,10 @@ public <T> CompletableFuture<T> submit(@NonNull Callable<T> task, @NonNull Objec
155162
sequenceKey,
156163
(k, presentTask) -> (presentTask == null)
157164
? CompletableFuture.supplyAsync(() -> call(task), workerExecutorService)
158-
: presentTask.handleAsync((r, e) -> call(task), workerExecutorService));
165+
: presentTask.handleAsync((r, e) -> call(task)));
159166
CompletableFuture<?> copy = latestTask.thenApply(result -> result);
160-
latestTask.whenCompleteAsync(
161-
(r, e) -> activeSequentialTasks.computeIfPresent(
162-
sequenceKey, (k, checkedTask) -> checkedTask.isDone() ? null : checkedTask),
163-
adminService);
167+
latestTask.whenCompleteAsync((r, e) -> activeSequentialTasks.computeIfPresent(
168+
sequenceKey, (k, checkedTask) -> checkedTask.isDone() ? null : checkedTask));
164169
return (CompletableFuture<T>) copy;
165170
}
166171

@@ -181,23 +186,16 @@ boolean noTaskPending() {
181186

182187
@Override
183188
public void terminate() {
184-
new Thread(() -> {
185-
workerExecutorService.shutdown();
186-
awaitForever().until(this::noTaskPending);
187-
adminService.shutdown();
188-
})
189-
.start();
189+
workerExecutorService.shutdown();
190190
}
191191

192192
@Override
193193
public boolean isTerminated() {
194-
return workerExecutorService.isTerminated() && adminService.isTerminated();
194+
return workerExecutorService.isTerminated();
195195
}
196196

197197
@Override
198198
public @Nonnull List<Runnable> terminateNow() {
199-
List<Runnable> neverStartedTasks = workerExecutorService.shutdownNow();
200-
adminService.shutdownNow();
201-
return neverStartedTasks;
199+
return workerExecutorService.shutdownNow();
202200
}
203201
}

0 commit comments

Comments
 (0)