Skip to content

Commit 8b649de

Browse files
committed
Allow TransactionOutbox#scheduler shutdown
1 parent 1253bb7 commit 8b649de

File tree

5 files changed

+164
-28
lines changed

5 files changed

+164
-28
lines changed

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/ExecutorSubmitter.java

Lines changed: 86 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
import com.gruelbox.transactionoutbox.spi.Utils;
44
import java.util.concurrent.ArrayBlockingQueue;
55
import java.util.concurrent.Executor;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.ForkJoinPool;
68
import java.util.concurrent.RejectedExecutionException;
9+
import java.util.concurrent.ThreadPoolExecutor;
10+
import java.util.concurrent.TimeUnit;
711
import java.util.function.Consumer;
8-
import lombok.Builder;
912
import lombok.extern.slf4j.Slf4j;
1013
import org.slf4j.event.Level;
1114

@@ -37,23 +40,38 @@
3740
* </ul>
3841
*/
3942
@Slf4j
40-
@Builder
4143
public class ExecutorSubmitter implements Submitter, Validatable {
4244

43-
/**
44-
* @param executor The executor to use.
45-
*/
4645
@SuppressWarnings("JavaDoc")
4746
private final Executor executor;
4847

49-
/**
50-
* @param logLevelWorkQueueSaturation The log level to use when work submission hits the executor
51-
* queue limit. This usually indicates saturation and may be of greater interest than the
52-
* default {@code DEBUG} level.
53-
*/
48+
private final boolean shutdownExecutorOnClose;
49+
5450
@SuppressWarnings("JavaDoc")
55-
@Builder.Default
56-
private final Level logLevelWorkQueueSaturation = Level.DEBUG;
51+
private final Level logLevelWorkQueueSaturation;
52+
53+
ExecutorSubmitter(Executor executor, Level logLevelWorkQueueSaturation) {
54+
if (executor != null) {
55+
this.executor = executor;
56+
shutdownExecutorOnClose = false;
57+
} else {
58+
// JDK bug means this warning can't be fixed
59+
//noinspection Convert2Diamond
60+
this.executor =
61+
new ThreadPoolExecutor(
62+
1,
63+
Math.max(1, ForkJoinPool.commonPool().getParallelism()),
64+
0L,
65+
TimeUnit.MILLISECONDS,
66+
new ArrayBlockingQueue<>(16384));
67+
shutdownExecutorOnClose = true;
68+
}
69+
this.logLevelWorkQueueSaturation = logLevelWorkQueueSaturation;
70+
}
71+
72+
public static ExecutorSubmitterBuilder builder() {
73+
return new ExecutorSubmitterBuilder();
74+
}
5775

5876
@Override
5977
public void submit(TransactionOutboxEntry entry, Consumer<TransactionOutboxEntry> localExecutor) {
@@ -79,4 +97,60 @@ public void validate(Validator validator) {
7997
validator.notNull("executor", executor);
8098
validator.notNull("logLevelWorkQueueSaturation", logLevelWorkQueueSaturation);
8199
}
100+
101+
@Override
102+
public void close() {
103+
if (!shutdownExecutorOnClose) {
104+
return;
105+
}
106+
if (!(executor instanceof ExecutorService)) {
107+
return;
108+
}
109+
Utils.shutdown((ExecutorService) executor);
110+
}
111+
112+
public static class ExecutorSubmitterBuilder {
113+
private Executor executor;
114+
private Level logLevelWorkQueueSaturation;
115+
private boolean logLevelWorkQueueSaturationSet;
116+
117+
ExecutorSubmitterBuilder() {}
118+
119+
/**
120+
* @param executor The executor to use. If not provided, a {@link ThreadPoolExecutor}, sized to
121+
* match {@link * ForkJoinPool#commonPool()} (or one thread, whichever is the larger), with
122+
* a maximum queue size * of 16384 will be used.
123+
*/
124+
public ExecutorSubmitterBuilder executor(Executor executor) {
125+
this.executor = executor;
126+
return this;
127+
}
128+
129+
/**
130+
* @param logLevelWorkQueueSaturation The log level to use when work submission hits the
131+
* executor queue limit. This usually indicates saturation and may be of greater interest
132+
* than the default {@code DEBUG} level.
133+
*/
134+
public ExecutorSubmitterBuilder logLevelWorkQueueSaturation(Level logLevelWorkQueueSaturation) {
135+
this.logLevelWorkQueueSaturation = logLevelWorkQueueSaturation;
136+
logLevelWorkQueueSaturationSet = true;
137+
return this;
138+
}
139+
140+
public ExecutorSubmitter build() {
141+
Level logLevelWorkQueueSaturationToUse = logLevelWorkQueueSaturation;
142+
if (!logLevelWorkQueueSaturationSet) {
143+
logLevelWorkQueueSaturationToUse = Level.DEBUG;
144+
}
145+
return new ExecutorSubmitter(this.executor, logLevelWorkQueueSaturationToUse);
146+
}
147+
148+
public String toString() {
149+
return "ExecutorSubmitter.ExecutorSubmitterBuilder(executor="
150+
+ this.executor
151+
+ ", logLevelWorkQueueSaturation$value="
152+
+ logLevelWorkQueueSaturation
153+
+ ")";
154+
}
155+
}
82156
}

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Submitter.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
package com.gruelbox.transactionoutbox;
22

3-
import java.util.concurrent.ArrayBlockingQueue;
43
import java.util.concurrent.Executor;
54
import java.util.concurrent.ForkJoinPool;
65
import java.util.concurrent.ThreadPoolExecutor;
7-
import java.util.concurrent.TimeUnit;
86
import java.util.function.Consumer;
97

108
/** Called by {@link TransactionOutbox} to submit work for background processing. */
11-
public interface Submitter {
9+
public interface Submitter extends AutoCloseable {
1210

1311
/**
1412
* Schedules background work using a local {@link Executor} implementation.
@@ -30,15 +28,7 @@ static Submitter withExecutor(Executor executor) {
3028
* @return The submitter.
3129
*/
3230
static Submitter withDefaultExecutor() {
33-
// JDK bug means this warning can't be fixed
34-
//noinspection Convert2Diamond
35-
return withExecutor(
36-
new ThreadPoolExecutor(
37-
1,
38-
Math.max(1, ForkJoinPool.commonPool().getParallelism()),
39-
0L,
40-
TimeUnit.MILLISECONDS,
41-
new ArrayBlockingQueue<Runnable>(16384)));
31+
return ExecutorSubmitter.builder().build();
4232
}
4333

4434
/**
@@ -72,4 +62,10 @@ static Submitter withDefaultExecutor() {
7262
* just a call to {@link TransactionOutbox#processNow(TransactionOutboxEntry)}).
7363
*/
7464
void submit(TransactionOutboxEntry entry, Consumer<TransactionOutboxEntry> localExecutor);
65+
66+
/**
67+
* Releases any releasable resource. The instance will become unusable after calling this method.
68+
*/
69+
@Override
70+
default void close() {}
7571
}

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* pattern for Java. See <a href="https://github.com/gruelbox/transaction-outbox">README</a> for
1515
* usage instructions.
1616
*/
17-
public interface TransactionOutbox {
17+
public interface TransactionOutbox extends AutoCloseable {
1818

1919
/**
2020
* @return A builder for creating a new instance of {@link TransactionOutbox}.
@@ -135,6 +135,10 @@ default boolean flush() {
135135
@SuppressWarnings("WeakerAccess")
136136
void processNow(TransactionOutboxEntry entry);
137137

138+
/** Releases any releasable resource. The instance is unusable after calling this method. */
139+
@Override
140+
default void close() {}
141+
138142
/** Builder for {@link TransactionOutbox}. */
139143
@ToString
140144
abstract class TransactionOutboxBuilder {

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import java.util.concurrent.atomic.AtomicBoolean;
2424
import java.util.function.Function;
2525
import java.util.function.Supplier;
26-
import lombok.AccessLevel;
27-
import lombok.RequiredArgsConstructor;
2826
import lombok.Setter;
2927
import lombok.ToString;
3028
import lombok.experimental.Accessors;
@@ -33,13 +31,13 @@
3331
import org.slf4j.event.Level;
3432

3533
@Slf4j
36-
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
3734
final class TransactionOutboxImpl implements TransactionOutbox, Validatable {
3835

3936
private final TransactionManager transactionManager;
4037
private final Persistor persistor;
4138
private final Instantiator instantiator;
4239
private final Submitter submitter;
40+
private final boolean closeSubmitterOnClose;
4341
private final Duration attemptFrequency;
4442
private final Level logLevelTemporaryFailure;
4543
private final int blockAfterAttempts;
@@ -53,6 +51,41 @@ final class TransactionOutboxImpl implements TransactionOutbox, Validatable {
5351
private final ProxyFactory proxyFactory = new ProxyFactory();
5452
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
5553

54+
private TransactionOutboxImpl(
55+
TransactionManager transactionManager,
56+
Persistor persistor,
57+
Instantiator instantiator,
58+
Submitter submitter,
59+
Duration attemptFrequency,
60+
Level logLevelTemporaryFailure,
61+
int blockAfterAttempts,
62+
int flushBatchSize,
63+
Supplier<Clock> clockProvider,
64+
TransactionOutboxListener listener,
65+
boolean serializeMdc,
66+
Validator validator,
67+
Duration retentionThreshold) {
68+
this.transactionManager = transactionManager;
69+
this.persistor = persistor;
70+
this.instantiator = instantiator;
71+
if (submitter != null) {
72+
this.submitter = submitter;
73+
closeSubmitterOnClose = false;
74+
} else {
75+
this.submitter = Submitter.withDefaultExecutor();
76+
closeSubmitterOnClose = true;
77+
}
78+
this.attemptFrequency = attemptFrequency;
79+
this.logLevelTemporaryFailure = logLevelTemporaryFailure;
80+
this.blockAfterAttempts = blockAfterAttempts;
81+
this.flushBatchSize = flushBatchSize;
82+
this.clockProvider = clockProvider;
83+
this.listener = listener;
84+
this.serializeMdc = serializeMdc;
85+
this.validator = validator;
86+
this.retentionThreshold = retentionThreshold;
87+
}
88+
5689
@Override
5790
public void validate(Validator validator) {
5891
validator.notNull("transactionManager", transactionManager);
@@ -412,6 +445,14 @@ private void updateAttemptCount(TransactionOutboxEntry entry, Throwable cause) {
412445
}
413446
}
414447

448+
@Override
449+
public void close() {
450+
if (closeSubmitterOnClose) {
451+
submitter.close();
452+
}
453+
Utils.shutdown(scheduler);
454+
}
455+
415456
@ToString
416457
static class TransactionOutboxBuilderImpl extends TransactionOutboxBuilder {
417458

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/spi/Utils.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import com.gruelbox.transactionoutbox.UncheckedException;
55
import java.util.Arrays;
66
import java.util.concurrent.Callable;
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.TimeUnit;
79
import java.util.function.Supplier;
810
import java.util.stream.Collectors;
911
import lombok.extern.slf4j.Slf4j;
@@ -106,4 +108,23 @@ public static void logAtLevel(Logger logger, Level level, String message, Object
106108
break;
107109
}
108110
}
111+
112+
public static void shutdown(ExecutorService executorService) {
113+
executorService.shutdownNow();
114+
boolean terminated;
115+
try {
116+
terminated = executorService.awaitTermination(5, TimeUnit.SECONDS);
117+
} catch (InterruptedException e) {
118+
Thread.currentThread().interrupt();
119+
throw new RuntimeException(e);
120+
}
121+
if (terminated) {
122+
return;
123+
}
124+
logAtLevel(
125+
log,
126+
Level.WARN,
127+
"At least one task didn't stop within 5 seconds following the shutdown signal sent to {}",
128+
executorService);
129+
}
109130
}

0 commit comments

Comments
 (0)