Skip to content

Commit 87aedea

Browse files
committed
+ fixed close for ConseqServiceFactory
1 parent a648fb0 commit 87aedea

File tree

3 files changed

+66
-12
lines changed

3 files changed

+66
-12
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
<modelVersion>4.0.0</modelVersion>
2828
<groupId>io.github.q3769</groupId>
2929
<artifactId>conseq4j</artifactId>
30-
<version>20230924.0.0</version>
30+
<version>20230924.0.20230927</version>
3131
<packaging>jar</packaging>
3232
<name>conseq4j</name>
3333
<description>A Java concurrent API to sequence related tasks while concurring unrelated ones</description>

src/main/java/conseq4j/summon/ConseqServiceFactory.java

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,7 @@
3131
import javax.annotation.concurrent.ThreadSafe;
3232
import java.util.List;
3333
import java.util.Objects;
34-
import java.util.concurrent.ConcurrentHashMap;
35-
import java.util.concurrent.ConcurrentMap;
36-
import java.util.concurrent.ExecutorService;
37-
import java.util.concurrent.Executors;
34+
import java.util.concurrent.*;
3835

3936
import static java.lang.Math.floorMod;
4037

@@ -67,16 +64,16 @@ private ConseqServiceFactory(int concurrency) {
6764
/**
6865
* @return ExecutorService factory with default concurrency
6966
*/
70-
public static ConseqServiceFactory instance() {
71-
return new ConseqServiceFactory(Runtime.getRuntime().availableProcessors());
67+
public static @Nonnull ConseqServiceFactory instance() {
68+
return instance(Runtime.getRuntime().availableProcessors());
7269
}
7370

7471
/**
7572
* @param concurrency
7673
* max number of tasks possible to be executed in parallel
7774
* @return ExecutorService factory with given concurrency
7875
*/
79-
public static ConseqServiceFactory instance(int concurrency) {
76+
public static @Nonnull ConseqServiceFactory instance(int concurrency) {
8077
return new ConseqServiceFactory(concurrency);
8178
}
8279

@@ -86,12 +83,12 @@ public static ConseqServiceFactory instance(int concurrency) {
8683
@Override
8784
public ExecutorService getExecutorService(@NonNull Object sequenceKey) {
8885
return this.sequentialExecutors.computeIfAbsent(bucketOf(sequenceKey),
89-
bucket -> new ShutdownDisabledExecutorService(Executors.newFixedThreadPool(1)));
86+
bucket -> new ShutdownDisabledExecutorService(Executors.newSingleThreadExecutor()));
9087
}
9188

9289
@Override
9390
public void close() {
94-
sequentialExecutors.values().forEach(ExecutorService::close);
91+
sequentialExecutors.values().forEach(ShutdownDisabledExecutorService::closeDelegate);
9592
}
9693

9794
private int bucketOf(Object sequenceKey) {
@@ -137,6 +134,32 @@ public void shutdown() {
137134
throw new UnsupportedOperationException(SHUTDOWN_UNSUPPORTED_MESSAGE);
138135
}
139136

137+
@Override
138+
public void close() {
139+
throw new UnsupportedOperationException(SHUTDOWN_UNSUPPORTED_MESSAGE);
140+
}
141+
142+
void closeDelegate() {
143+
boolean terminated = isTerminated();
144+
if (!terminated) {
145+
shutdownDelegate();
146+
boolean interrupted = false;
147+
while (!terminated) {
148+
try {
149+
terminated = awaitTermination(1L, TimeUnit.DAYS);
150+
} catch (InterruptedException e) {
151+
if (!interrupted) {
152+
shutdownDelegateNow();
153+
interrupted = true;
154+
}
155+
}
156+
}
157+
if (interrupted) {
158+
Thread.currentThread().interrupt();
159+
}
160+
}
161+
}
162+
140163
void shutdownDelegate() {
141164
this.delegate.shutdown();
142165
}
@@ -153,6 +176,8 @@ private interface ShutdownOperations {
153176
void shutdown();
154177

155178
List<Runnable> shutdownNow();
179+
180+
void close();
156181
}
157182
}
158183
}

src/test/java/conseq4j/summon/ConseqServiceFactoryTest.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,21 @@
2626
import com.google.common.collect.Range;
2727
import conseq4j.SpyingTask;
2828
import conseq4j.TestUtils;
29+
import org.junit.jupiter.api.Nested;
2930
import org.junit.jupiter.api.Test;
3031

3132
import java.util.List;
3233
import java.util.UUID;
3334
import java.util.concurrent.Callable;
3435
import java.util.concurrent.ExecutionException;
36+
import java.util.concurrent.ExecutorService;
3537
import java.util.concurrent.Future;
3638
import java.util.stream.Collectors;
3739

3840
import static conseq4j.TestUtils.createSpyingTasks;
3941
import static conseq4j.TestUtils.getIfAllCompleteNormal;
4042
import static java.util.stream.Collectors.toList;
41-
import static org.junit.jupiter.api.Assertions.assertEquals;
42-
import static org.junit.jupiter.api.Assertions.assertTrue;
43+
import static org.junit.jupiter.api.Assertions.*;
4344

4445
/**
4546
* @author Qingtian Wang
@@ -132,4 +133,32 @@ void submitsRunAllTasksOfSameSequenceKeyInSequence() {
132133
TestUtils.awaitAllComplete(tasks);
133134
assertSingleThread(tasks);
134135
}
136+
137+
@Nested
138+
class individualShutdownUnsupported {
139+
@Test
140+
void whenShutdownsCalled() {
141+
ExecutorService sequentialExecutor = ConseqServiceFactory.instance().getExecutorService(UUID.randomUUID());
142+
143+
assertThrows(UnsupportedOperationException.class, sequentialExecutor::shutdown);
144+
assertThrows(UnsupportedOperationException.class, sequentialExecutor::shutdownNow);
145+
assertThrows(UnsupportedOperationException.class, sequentialExecutor::close);
146+
assertFalse(sequentialExecutor.isShutdown());
147+
assertFalse(sequentialExecutor.isTerminated());
148+
}
149+
}
150+
151+
@Nested
152+
class factoryClose {
153+
@Test
154+
void whenCloseCalled() {
155+
ConseqServiceFactory conseqServiceFactory = ConseqServiceFactory.instance();
156+
ExecutorService sequentialExecutor = conseqServiceFactory.getExecutorService(UUID.randomUUID());
157+
158+
conseqServiceFactory.close();
159+
160+
assertTrue(sequentialExecutor.isShutdown());
161+
assertTrue(sequentialExecutor.isTerminated());
162+
}
163+
}
135164
}

0 commit comments

Comments
 (0)