Skip to content

Commit 24179d5

Browse files
committed
+ added blocking shutdown via AutoCloseable
1 parent fef7bde commit 24179d5

File tree

3 files changed

+23
-5
lines changed

3 files changed

+23
-5
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
<modelVersion>4.0.0</modelVersion>
2828
<groupId>io.github.q3769</groupId>
2929
<artifactId>conseq4j</artifactId>
30-
<version>20230922.0.20230924</version>
30+
<version>20230922.20230924.0</version>
3131
<packaging>jar</packaging>
3232
<name>conseq4j</name>
3333
<description>A Java concurrent API to sequence related tasks while concurring unrelated ones</description>

src/main/java/conseq4j/execute/ConseqExecutor.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
*/
4343
@ThreadSafe
4444
@ToString
45-
public final class ConseqExecutor implements SequentialExecutor {
45+
public final class ConseqExecutor implements SequentialExecutor, AutoCloseable {
4646

4747
private final Map<Object, CompletableFuture<?>> activeSequentialTasks = new ConcurrentHashMap<>();
4848
private final ExecutorService adminService = Executors.newSingleThreadExecutor();
@@ -52,7 +52,6 @@ public final class ConseqExecutor implements SequentialExecutor {
5252
* max parallelism of task execution.
5353
*/
5454
private final ExecutorService workerExecutorService;
55-
private final ConditionFactory await = Awaitility.await().forever();
5655

5756
private ConseqExecutor(ExecutorService workerExecutorService) {
5857
this.workerExecutorService = workerExecutorService;
@@ -83,6 +82,10 @@ private ConseqExecutor(ExecutorService workerExecutorService) {
8382
return new ConseqExecutor(workerExecutorService);
8483
}
8584

85+
private static ConditionFactory await() {
86+
return Awaitility.await().forever();
87+
}
88+
8689
private static <T> T call(Callable<T> task) {
8790
try {
8891
return task.call();
@@ -156,7 +159,7 @@ public <T> CompletableFuture<T> submit(@NonNull Callable<T> task, @NonNull Objec
156159
public void shutdown() {
157160
new Thread(() -> {
158161
workerExecutorService.shutdown();
159-
await.until(activeSequentialTasks::isEmpty);
162+
await().until(activeSequentialTasks::isEmpty);
160163
adminService.shutdown();
161164
}).start();
162165
}
@@ -173,6 +176,14 @@ public boolean isTerminated() {
173176
return neverStartedTasks;
174177
}
175178

179+
@Override
180+
public void close() {
181+
workerExecutorService.shutdown();
182+
await().until(activeSequentialTasks::isEmpty);
183+
adminService.shutdown();
184+
await().until(this::isTerminated);
185+
}
186+
176187
int estimateActiveExecutorCount() {
177188
return activeSequentialTasks.size();
178189
}

src/main/java/conseq4j/summon/ConseqServiceFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import lombok.NonNull;
2727
import lombok.ToString;
2828
import lombok.experimental.Delegate;
29+
import org.awaitility.Awaitility;
2930

3031
import javax.annotation.Nonnull;
3132
import javax.annotation.concurrent.ThreadSafe;
@@ -49,7 +50,7 @@
4950

5051
@ThreadSafe
5152
@ToString
52-
public final class ConseqServiceFactory implements SequentialExecutorServiceFactory {
53+
public final class ConseqServiceFactory implements SequentialExecutorServiceFactory, AutoCloseable {
5354
private final int concurrency;
5455
private final ConcurrentMap<Object, ShutdownDisabledExecutorService> sequentialExecutors;
5556

@@ -110,6 +111,12 @@ public List<Runnable> shutdownNow() {
110111
.collect(Collectors.toList());
111112
}
112113

114+
@Override
115+
public void close() {
116+
this.shutdown();
117+
Awaitility.await().forever().until(this::isTerminated);
118+
}
119+
113120
private int bucketOf(Object sequenceKey) {
114121
return floorMod(Objects.hash(sequenceKey), this.concurrency);
115122
}

0 commit comments

Comments
 (0)