Skip to content

Commit f01ab4a

Browse files
committed
+ Terminable api change
+ replaced ThreadPoolExecutor with ForkJointPool
1 parent a80964d commit f01ab4a

File tree

5 files changed

+57
-161
lines changed

5 files changed

+57
-161
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
<modelVersion>4.0.0</modelVersion>
2828
<groupId>io.github.q3769</groupId>
2929
<artifactId>conseq4j</artifactId>
30-
<version>20230619.20230729.20230730</version>
30+
<version>20230918.0.0</version>
3131
<packaging>jar</packaging>
3232
<name>conseq4j</name>
3333
<description>A Java concurrent API to sequence related tasks while concurring unrelated ones</description>

src/main/java/conseq4j/Terminable.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
package conseq4j;
2626

27+
import java.util.List;
28+
2729
/**
2830
*
2931
*/
@@ -43,7 +45,7 @@ public interface Terminable {
4345
boolean isTerminated();
4446

4547
/**
46-
* @return true if, by estimation, no worker/thread is actively executing tasks.
48+
* @return Tasks submitted but never started executing
4749
*/
48-
boolean isIdle();
50+
List<Runnable> shutdownNow();
4951
}

src/main/java/conseq4j/execute/ConseqExecutor.java

Lines changed: 24 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import javax.annotation.Nonnull;
3333
import javax.annotation.concurrent.ThreadSafe;
34+
import java.util.List;
3435
import java.util.Map;
3536
import java.util.concurrent.*;
3637

@@ -42,10 +43,6 @@
4243
@ThreadSafe
4344
@ToString
4445
public final class ConseqExecutor implements SequentialExecutor {
45-
private static final int DEFAULT_CONCURRENCY = Math.max(16, Runtime.getRuntime().availableProcessors());
46-
private static final int DEFAULT_WORK_QUEUE_CAPACITY = Integer.MAX_VALUE;
47-
private static final RejectedExecutionHandler DEFAULT_REJECTED_HANDLER = new ThreadPoolExecutor.AbortPolicy();
48-
private static final Builder.WorkQueueType DEFAULT_WORK_QUEUE_TYPE = Builder.WorkQueueType.LINKED;
4946

5047
private final Map<Object, CompletableFuture<?>> activeSequentialTasks = new ConcurrentHashMap<>();
5148
private final ExecutorService adminService = Executors.newSingleThreadExecutor();
@@ -54,49 +51,36 @@ public final class ConseqExecutor implements SequentialExecutor {
5451
* from the pool can be used to execute any task, regardless of sequence keys. The pool capacity decides the overall
5552
* max parallelism of task execution.
5653
*/
57-
private final ThreadPoolExecutor workerThreadPool;
54+
private final ExecutorService workerExecutorService;
5855
private final ConditionFactory await = Awaitility.await().forever();
5956

60-
private ConseqExecutor(@Nonnull Builder builder) {
61-
this(new ThreadPoolExecutor(builder.concurrency,
62-
builder.concurrency,
63-
0,
64-
TimeUnit.MILLISECONDS,
65-
builder.workQueueType == Builder.WorkQueueType.ARRAY ?
66-
new ArrayBlockingQueue<>(builder.workQueueCapacity) :
67-
new LinkedBlockingQueue<>(builder.workQueueCapacity),
68-
Executors.defaultThreadFactory(),
69-
builder.rejectedExecutionHandler));
70-
}
71-
72-
private ConseqExecutor(ThreadPoolExecutor workerThreadPool) {
73-
this.workerThreadPool = workerThreadPool;
57+
private ConseqExecutor(ExecutorService workerExecutorService) {
58+
this.workerExecutorService = workerExecutorService;
7459
}
7560

7661
/**
7762
* @return conseq executor with default concurrency
7863
*/
79-
public static @Nonnull ConseqExecutor newInstance() {
80-
return new Builder().build();
64+
public static @Nonnull ConseqExecutor instance() {
65+
return instance(Runtime.getRuntime().availableProcessors());
8166
}
8267

8368
/**
8469
* @param concurrency
85-
* max number of tasks that can be run in parallel by the returned executor instance. This is set as the max
86-
* capacity of the {@link #workerThreadPool}
70+
* max number of tasks that can be run in parallel by the returned executor instance.
8771
* @return conseq executor with given concurrency
8872
*/
89-
public static @Nonnull ConseqExecutor newInstance(int concurrency) {
90-
return new Builder().concurrency(concurrency).build();
73+
public static @Nonnull ConseqExecutor instance(int concurrency) {
74+
return instance(new ForkJoinPool(concurrency, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true));
9175
}
9276

9377
/**
94-
* @param workerThreadPool
95-
* the worker thread pool that facilitates the overall async execution, independent of the submitted tasks.
96-
* @return new ConseqExecutor instance backed by the specified workerThreadPool
78+
* @param workerExecutorService
79+
* ExecutorService that backs the async operations of worker threads
80+
* @return instance of {@link ConseqExecutor}
9781
*/
98-
public static ConseqExecutor from(ThreadPoolExecutor workerThreadPool) {
99-
return new ConseqExecutor(workerThreadPool);
82+
public static @Nonnull ConseqExecutor instance(ExecutorService workerExecutorService) {
83+
return new ConseqExecutor(workerExecutorService);
10084
}
10185

10286
private static <T> T call(Callable<T> task) {
@@ -122,7 +106,7 @@ public CompletableFuture<Void> execute(@NonNull Runnable command, @NonNull Objec
122106

123107
/**
124108
* Tasks of different sequence keys execute in parallel, pending thread availability from the backing
125-
* {@link #workerThreadPool}.
109+
* {@link #workerExecutorService}.
126110
* <p>
127111
* Sequential execution of tasks under the same/equal sequence key is achieved by linearly chaining/queuing the
128112
* task/work stages of the same key, leveraging the {@link CompletableFuture} API.
@@ -160,8 +144,8 @@ public CompletableFuture<Void> execute(@NonNull Runnable command, @NonNull Objec
160144
public <T> CompletableFuture<T> submit(@NonNull Callable<T> task, @NonNull Object sequenceKey) {
161145
CompletableFuture<?> latestTask = activeSequentialTasks.compute(sequenceKey,
162146
(k, presentTask) -> (presentTask == null) ?
163-
CompletableFuture.supplyAsync(() -> call(task), workerThreadPool) :
164-
presentTask.handleAsync((r, e) -> call(task), workerThreadPool));
147+
CompletableFuture.supplyAsync(() -> call(task), workerExecutorService) :
148+
presentTask.handleAsync((r, e) -> call(task), workerExecutorService));
165149
latestTask.whenCompleteAsync((r, e) -> activeSequentialTasks.computeIfPresent(sequenceKey,
166150
(k, checkedTask) -> checkedTask.isDone() ? null : checkedTask), adminService);
167151
return (CompletableFuture<T>) latestTask.thenApply(r -> r);
@@ -171,113 +155,26 @@ public <T> CompletableFuture<T> submit(@NonNull Callable<T> task, @NonNull Objec
171155
public void shutdown() {
172156
ExecutorService asyncThread = Executors.newSingleThreadExecutor();
173157
asyncThread.execute(() -> {
174-
workerThreadPool.shutdown();
175-
await.until(this::isIdle);
158+
workerExecutorService.shutdown();
159+
await.until(activeSequentialTasks::isEmpty);
176160
adminService.shutdown();
177161
});
178162
asyncThread.shutdown();
179163
}
180164

181165
@Override
182166
public boolean isTerminated() {
183-
return this.workerThreadPool.isTerminated() && this.adminService.isTerminated();
167+
return this.workerExecutorService.isTerminated() && this.adminService.isTerminated();
184168
}
185169

186170
@Override
187-
public boolean isIdle() {
188-
return activeSequentialTasks.isEmpty() && workerThreadPool.getActiveCount() == 0;
171+
public List<Runnable> shutdownNow() {
172+
List<Runnable> neverStartedTasks = workerExecutorService.shutdownNow();
173+
adminService.shutdownNow();
174+
return neverStartedTasks;
189175
}
190176

191177
int estimateActiveExecutorCount() {
192178
return activeSequentialTasks.size();
193179
}
194-
195-
/**
196-
* {@code ConseqExecutor} builder static inner class.
197-
*/
198-
public static final class Builder {
199-
private int concurrency;
200-
private int workQueueCapacity;
201-
private WorkQueueType workQueueType;
202-
private RejectedExecutionHandler rejectedExecutionHandler;
203-
204-
/**
205-
*
206-
*/
207-
public Builder() {
208-
concurrency = DEFAULT_CONCURRENCY;
209-
workQueueCapacity = DEFAULT_WORK_QUEUE_CAPACITY;
210-
rejectedExecutionHandler = DEFAULT_REJECTED_HANDLER;
211-
workQueueType = DEFAULT_WORK_QUEUE_TYPE;
212-
}
213-
214-
/**
215-
* Sets the {@code concurrency} and returns a reference to this Builder enabling method chaining.
216-
*
217-
* @param concurrency
218-
* the {@code concurrency} to set
219-
* @return a reference to this Builder
220-
*/
221-
public Builder concurrency(int concurrency) {
222-
this.concurrency = concurrency;
223-
return this;
224-
}
225-
226-
/**
227-
* Sets the {@code workQueueCapacity} and returns a reference to this Builder enabling method chaining.
228-
*
229-
* @param workQueueCapacity
230-
* the {@code workQueueCapacity} to set
231-
* @return a reference to this Builder
232-
*/
233-
public Builder workQueueCapacity(int workQueueCapacity) {
234-
this.workQueueCapacity = workQueueCapacity;
235-
return this;
236-
}
237-
238-
/**
239-
* @param rejectedExecutionHandler
240-
* handler executed by the caller thread if a task is rejected by the conseq executor, maybe e.g.
241-
* because of work queue is full. Default is {@link ThreadPoolExecutor.AbortPolicy}
242-
* @return a reference to this Builder
243-
*/
244-
public Builder rejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
245-
this.rejectedExecutionHandler = rejectedExecutionHandler;
246-
return this;
247-
}
248-
249-
/**
250-
* Returns a {@code ConseqExecutor} built from the parameters previously set.
251-
*
252-
* @return a {@code ConseqExecutor} built with parameters of this {@code ConseqExecutor.Builder}
253-
*/
254-
public @Nonnull ConseqExecutor build() {
255-
return new ConseqExecutor(this);
256-
}
257-
258-
/**
259-
* @param workQueueType
260-
* {@link WorkQueueType#LINKED} meaning {@link LinkedBlockingQueue}, or {@link WorkQueueType#ARRAY}
261-
* meaning {@link ArrayBlockingQueue}, default to {@link WorkQueueType#LINKED}
262-
* @return a reference to this Builder
263-
*/
264-
public Builder workQueueType(WorkQueueType workQueueType) {
265-
this.workQueueType = workQueueType;
266-
return this;
267-
}
268-
269-
/**
270-
*
271-
*/
272-
public enum WorkQueueType {
273-
/**
274-
*
275-
*/
276-
LINKED,
277-
/**
278-
*
279-
*/
280-
ARRAY
281-
}
282-
}
283180
}

src/main/java/conseq4j/summon/ConseqServiceFactory.java

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,15 @@
2626
import lombok.ToString;
2727
import lombok.experimental.Delegate;
2828

29+
import javax.annotation.Nonnull;
2930
import javax.annotation.concurrent.ThreadSafe;
30-
import java.util.ArrayList;
31+
import java.util.Collection;
3132
import java.util.List;
3233
import java.util.Objects;
33-
import java.util.concurrent.*;
34+
import java.util.concurrent.ConcurrentHashMap;
35+
import java.util.concurrent.ConcurrentMap;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.Executors;
3438
import java.util.stream.Collectors;
3539

3640
import static java.lang.Math.floorMod;
@@ -89,25 +93,21 @@ public ExecutorService getExecutorService(Object sequenceKey) {
8993

9094
@Override
9195
public void shutdown() {
92-
List<ExecutorService> executorServices = new ArrayList<>(this.sequentialExecutors.values());
93-
List<ExecutorService> shutdownDisabledExecutorServices = executorServices.stream()
94-
.filter(ShutdownDisabledExecutorService.class::isInstance)
95-
.collect(Collectors.toList());
96-
shutdownDisabledExecutorServices.stream()
97-
.map(ShutdownDisabledExecutorService.class::cast)
98-
.forEach(ShutdownDisabledExecutorService::shutdownDelegate);
99-
executorServices.removeAll(shutdownDisabledExecutorServices);
100-
executorServices.forEach(ExecutorService::shutdown);
96+
this.sequentialExecutors.values().forEach(ShutdownDisabledExecutorService::shutdownDelegate);
10197
}
10298

10399
@Override
104100
public boolean isTerminated() {
105-
return this.sequentialExecutors.values().parallelStream().allMatch(ExecutorService::isTerminated);
101+
return this.sequentialExecutors.values().stream().allMatch(ExecutorService::isTerminated);
106102
}
107103

108104
@Override
109-
public boolean isIdle() {
110-
return sequentialExecutors.values().stream().allMatch(e -> e.getActiveCount() == 0);
105+
public List<Runnable> shutdownNow() {
106+
return sequentialExecutors.values()
107+
.parallelStream()
108+
.map(ShutdownDisabledExecutorService::shutdownDelegateNow)
109+
.flatMap(Collection::stream)
110+
.collect(Collectors.toList());
111111
}
112112

113113
private int bucketOf(Object sequenceKey) {
@@ -149,22 +149,20 @@ public void shutdown() {
149149
* @see #shutdown()
150150
*/
151151
@Override
152+
@Nonnull
152153
public List<Runnable> shutdownNow() {
153154
throw new UnsupportedOperationException(SHUTDOWN_UNSUPPORTED_MESSAGE);
154155
}
155156

156-
public int getActiveCount() {
157-
if (!(delegate instanceof ThreadPoolExecutor)) {
158-
throw new UnsupportedOperationException(
159-
"Delegate " + delegate + " is not of type " + ThreadPoolExecutor.class);
160-
}
161-
return ((ThreadPoolExecutor) delegate).getActiveCount();
162-
}
163-
164157
void shutdownDelegate() {
165158
this.delegate.shutdown();
166159
}
167160

161+
@Nonnull
162+
List<Runnable> shutdownDelegateNow() {
163+
return this.delegate.shutdownNow();
164+
}
165+
168166
/**
169167
* Methods that require complete overriding instead of delegation/decoration
170168
*/

0 commit comments

Comments
 (0)