2929
3030import javax .annotation .Nonnull ;
3131import javax .annotation .concurrent .ThreadSafe ;
32+ import java .util .Map ;
3233import java .util .concurrent .*;
3334
3435/**
4142public final class ConseqExecutor implements SequentialExecutor {
4243 private static final int DEFAULT_CONCURRENCY = Math .max (16 , Runtime .getRuntime ().availableProcessors ());
4344 private static final int DEFAULT_WORK_QUEUE_CAPACITY = Integer .MAX_VALUE ;
44- private final ConcurrentMap <Object , CompletableFuture <?>> sequentialExecutors ;
45+ private static final ThreadPoolExecutor .AbortPolicy DEFAULT_REJECTED_HANDLER = new ThreadPoolExecutor .AbortPolicy ();
46+ private final Map <Object , CompletableFuture <?>> sequentialExecutors = new ConcurrentHashMap <>();
47+ private final ExecutorService adminThreadPool = Executors .newCachedThreadPool ();
4548 /**
4649 * The worker thread pool facilitates the overall async execution, independent of the submitted tasks. Any thread
4750 * from the pool can be used to execute any task, regardless of sequence keys. The pool capacity decides the overall
4851 * max parallelism of task execution.
4952 */
5053 private final ExecutorService workerThreadPool ;
51- private final ExecutorService adminThreadPool = Executors .newCachedThreadPool ();
5254
53- private ConseqExecutor (int concurrency , int workQueueCapacity ) {
54- this .workerThreadPool =
55- workQueueCapacity == DEFAULT_WORK_QUEUE_CAPACITY ? Executors .newFixedThreadPool (concurrency ) :
56- new ThreadPoolExecutor (concurrency ,
57- concurrency ,
58- 0 ,
59- TimeUnit .MILLISECONDS ,
60- new ArrayBlockingQueue <>(workQueueCapacity ),
61- new BlockingRetryHandler ());
62- sequentialExecutors = new ConcurrentHashMap <>(concurrency );
55+ private ConseqExecutor (@ Nonnull Builder builder ) {
56+ this (new ThreadPoolExecutor (builder .concurrency ,
57+ builder .concurrency ,
58+ 0 ,
59+ TimeUnit .MILLISECONDS ,
60+ builder .workQueueCapacity == DEFAULT_WORK_QUEUE_CAPACITY ? new LinkedBlockingQueue <>() :
61+ new ArrayBlockingQueue <>(builder .workQueueCapacity ),
62+ Executors .defaultThreadFactory (),
63+ builder .rejectedPolicy ));
6364 }
6465
65- private ConseqExecutor (@ Nonnull Builder builder ) {
66- this ( builder . concurrency , builder . workQueueCapacity ) ;
66+ private ConseqExecutor (ThreadPoolExecutor workerThreadPool ) {
67+ this . workerThreadPool = workerThreadPool ;
6768 }
6869
6970 /**
7071 * @return conseq executor with default concurrency
7172 */
7273 public static @ Nonnull ConseqExecutor newInstance () {
73- return new ConseqExecutor ( DEFAULT_CONCURRENCY , DEFAULT_WORK_QUEUE_CAPACITY );
74+ return new Builder (). build ( );
7475 }
7576
7677 /**
@@ -80,7 +81,16 @@ private ConseqExecutor(@Nonnull Builder builder) {
8081 * @return conseq executor with given concurrency
8182 */
8283 public static @ Nonnull ConseqExecutor newInstance (int concurrency ) {
83- return new ConseqExecutor (concurrency , DEFAULT_WORK_QUEUE_CAPACITY );
84+ return new Builder ().concurrency (concurrency ).build ();
85+ }
86+
87+ /**
88+ * @param workerThreadPool
89+ * the worker thread pool that facilitates the overall async execution, independent of the submitted tasks.
90+ * @return new ConseqExecutor instance backed by the specified workerThreadPool
91+ */
92+ public static ConseqExecutor from (ThreadPoolExecutor workerThreadPool ) {
93+ return new ConseqExecutor (workerThreadPool );
8494 }
8595
8696 private static <T > T call (Callable <T > task ) {
@@ -155,6 +165,15 @@ public <T> CompletableFuture<T> submit(@NonNull Callable<T> task, @NonNull Objec
155165 @ Override
156166 public void shutdown () {
157167 this .workerThreadPool .shutdown ();
168+ while (true ) {
169+ try {
170+ if (this .workerThreadPool .awaitTermination (100 , TimeUnit .MILLISECONDS )) {
171+ break ;
172+ }
173+ } catch (InterruptedException e ) {
174+ Thread .currentThread ().interrupt ();
175+ }
176+ }
158177 this .adminThreadPool .shutdown ();
159178 }
160179
@@ -167,51 +186,21 @@ int estimateActiveExecutorCount() {
167186 return this .sequentialExecutors .size ();
168187 }
169188
170- static class BlockingRetryHandler implements RejectedExecutionHandler {
171- private static void blockingRetry (Runnable r , @ NonNull ThreadPoolExecutor executor ) {
172- if (executor .isTerminated ()) {
173- return ;
174- }
175- BlockingQueue <Runnable > workQueue = executor .getQueue ();
176- if (workQueue .offer (r )) {
177- return ;
178- }
179- boolean interrupted = false ;
180- try {
181- while (true ) {
182- try {
183- workQueue .put (r );
184- break ;
185- } catch (InterruptedException e ) {
186- interrupted = true ;
187- }
188- }
189- } finally {
190- if (interrupted ) {
191- Thread .currentThread ().interrupt ();
192- }
193- }
194- }
195-
196- @ Override
197- public void rejectedExecution (Runnable r , ThreadPoolExecutor executor ) {
198- blockingRetry (r , executor );
199- }
200- }
201-
202189 /**
203190 * {@code ConseqExecutor} builder static inner class.
204191 */
205192 public static final class Builder {
206193 private int concurrency ;
207194 private int workQueueCapacity ;
195+ private RejectedExecutionHandler rejectedPolicy ;
208196
209197 /**
210198 *
211199 */
212200 public Builder () {
213201 concurrency = DEFAULT_CONCURRENCY ;
214202 workQueueCapacity = DEFAULT_WORK_QUEUE_CAPACITY ;
203+ rejectedPolicy = DEFAULT_REJECTED_HANDLER ;
215204 }
216205
217206 /**
@@ -238,6 +227,17 @@ public Builder workQueueCapacity(int workQueueCapacity) {
238227 return this ;
239228 }
240229
230+ /**
231+ * @param rejectedPolicy
232+ * handler executed by the caller thread if a task is rejected by the conseq executor, maybe e.g.
233+ * because of work queue is full. Default is {@link ThreadPoolExecutor.AbortPolicy}
234+ * @return a reference to this Builder
235+ */
236+ public Builder rejectedPolicy (RejectedExecutionHandler rejectedPolicy ) {
237+ this .rejectedPolicy = rejectedPolicy ;
238+ return this ;
239+ }
240+
241241 /**
242242 * Returns a {@code ConseqExecutor} built from the parameters previously set.
243243 *
0 commit comments