Skip to content

Allow TransactionOutboxImpl#scheduler and ExecutorSubmitter#executor programmatic shutdown #687

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import com.gruelbox.transactionoutbox.spi.Utils;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.event.Level;

Expand Down Expand Up @@ -37,23 +40,38 @@
* </ul>
*/
@Slf4j
@Builder
public class ExecutorSubmitter implements Submitter, Validatable {

/**
* @param executor The executor to use.
*/
@SuppressWarnings("JavaDoc")
private final Executor executor;

/**
* @param logLevelWorkQueueSaturation The log level to use when work submission hits the executor
* queue limit. This usually indicates saturation and may be of greater interest than the
* default {@code DEBUG} level.
*/
private final boolean shutdownExecutorOnClose;

@SuppressWarnings("JavaDoc")
@Builder.Default
private final Level logLevelWorkQueueSaturation = Level.DEBUG;
private final Level logLevelWorkQueueSaturation;

ExecutorSubmitter(Executor executor, Level logLevelWorkQueueSaturation) {
if (executor != null) {
this.executor = executor;
shutdownExecutorOnClose = false;
} else {
// JDK bug means this warning can't be fixed
//noinspection Convert2Diamond
this.executor =
new ThreadPoolExecutor(
1,
Math.max(1, ForkJoinPool.commonPool().getParallelism()),
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(16384));
shutdownExecutorOnClose = true;
}
this.logLevelWorkQueueSaturation = logLevelWorkQueueSaturation;
}

public static ExecutorSubmitterBuilder builder() {
return new ExecutorSubmitterBuilder();
}

@Override
public void submit(TransactionOutboxEntry entry, Consumer<TransactionOutboxEntry> localExecutor) {
Expand All @@ -79,4 +97,60 @@ public void validate(Validator validator) {
validator.notNull("executor", executor);
validator.notNull("logLevelWorkQueueSaturation", logLevelWorkQueueSaturation);
}

@Override
public void close() {
if (!shutdownExecutorOnClose) {
return;
}
if (!(executor instanceof ExecutorService)) {
return;
}
Utils.shutdown((ExecutorService) executor);
}

public static class ExecutorSubmitterBuilder {
private Executor executor;
private Level logLevelWorkQueueSaturation;
private boolean logLevelWorkQueueSaturationSet;

ExecutorSubmitterBuilder() {}

/**
* @param executor The executor to use. If not provided, a {@link ThreadPoolExecutor}, sized to
* match {@link * ForkJoinPool#commonPool()} (or one thread, whichever is the larger), with
* a maximum queue size * of 16384 will be used.
*/
public ExecutorSubmitterBuilder executor(Executor executor) {
this.executor = executor;
return this;
}

/**
* @param logLevelWorkQueueSaturation The log level to use when work submission hits the
* executor queue limit. This usually indicates saturation and may be of greater interest
* than the default {@code DEBUG} level.
*/
public ExecutorSubmitterBuilder logLevelWorkQueueSaturation(Level logLevelWorkQueueSaturation) {
this.logLevelWorkQueueSaturation = logLevelWorkQueueSaturation;
logLevelWorkQueueSaturationSet = true;
return this;
}

public ExecutorSubmitter build() {
Level logLevelWorkQueueSaturationToUse = logLevelWorkQueueSaturation;
if (!logLevelWorkQueueSaturationSet) {
logLevelWorkQueueSaturationToUse = Level.DEBUG;
}
return new ExecutorSubmitter(this.executor, logLevelWorkQueueSaturationToUse);
}

public String toString() {
return "ExecutorSubmitter.ExecutorSubmitterBuilder(executor="
+ this.executor
+ ", logLevelWorkQueueSaturation$value="
+ logLevelWorkQueueSaturation
+ ")";
}
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package com.gruelbox.transactionoutbox;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/** Called by {@link TransactionOutbox} to submit work for background processing. */
public interface Submitter {
public interface Submitter extends AutoCloseable {

/**
* Schedules background work using a local {@link Executor} implementation.
Expand All @@ -30,15 +28,7 @@ static Submitter withExecutor(Executor executor) {
* @return The submitter.
*/
static Submitter withDefaultExecutor() {
// JDK bug means this warning can't be fixed
//noinspection Convert2Diamond
return withExecutor(
new ThreadPoolExecutor(
1,
Math.max(1, ForkJoinPool.commonPool().getParallelism()),
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(16384)));
return ExecutorSubmitter.builder().build();
}
Copy link
Contributor Author

@reda-alaoui reda-alaoui Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@badgerwithagun ExecutorSubmitter does not create an executor, but Submitter does here. AutoCloseable was originally added to Submitter because of this.

Copy link
Member

@badgerwithagun badgerwithagun Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you don't need to use this method.

You can just create a TransactionOutbox with your own.

var outbox = TransactionOutbox.builder()
   .submitter(ExecutorSubmitter.builder().executor(yourExecutor).build())
   ...
   .builder();

This is almost always what you do in a managed environment. The default Submitter implementation uses ForkJoinPool, which is a very bad idea when running in an application container.


/**
Expand Down Expand Up @@ -72,4 +62,10 @@ static Submitter withDefaultExecutor() {
* just a call to {@link TransactionOutbox#processNow(TransactionOutboxEntry)}).
*/
void submit(TransactionOutboxEntry entry, Consumer<TransactionOutboxEntry> localExecutor);

/**
* Releases any releasable resource. The instance will become unusable after calling this method.
*/
@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* pattern for Java. See <a href="https://github.com/gruelbox/transaction-outbox">README</a> for
* usage instructions.
*/
public interface TransactionOutbox {
public interface TransactionOutbox extends AutoCloseable {

/**
* @return A builder for creating a new instance of {@link TransactionOutbox}.
Expand Down Expand Up @@ -135,6 +135,10 @@ default boolean flush() {
@SuppressWarnings("WeakerAccess")
void processNow(TransactionOutboxEntry entry);

/** Releases any releasable resource. The instance is unusable after calling this method. */
@Override
default void close() {}

/** Builder for {@link TransactionOutbox}. */
@ToString
abstract class TransactionOutboxBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
Expand All @@ -33,13 +31,13 @@
import org.slf4j.event.Level;

@Slf4j
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
final class TransactionOutboxImpl implements TransactionOutbox, Validatable {

private final TransactionManager transactionManager;
private final Persistor persistor;
private final Instantiator instantiator;
private final Submitter submitter;
private final boolean closeSubmitterOnClose;
private final Duration attemptFrequency;
private final Level logLevelTemporaryFailure;
private final int blockAfterAttempts;
Expand All @@ -53,6 +51,41 @@ final class TransactionOutboxImpl implements TransactionOutbox, Validatable {
private final ProxyFactory proxyFactory = new ProxyFactory();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

private TransactionOutboxImpl(
TransactionManager transactionManager,
Persistor persistor,
Instantiator instantiator,
Submitter submitter,
Duration attemptFrequency,
Level logLevelTemporaryFailure,
int blockAfterAttempts,
int flushBatchSize,
Supplier<Clock> clockProvider,
TransactionOutboxListener listener,
boolean serializeMdc,
Validator validator,
Duration retentionThreshold) {
this.transactionManager = transactionManager;
this.persistor = persistor;
this.instantiator = instantiator;
if (submitter != null) {
this.submitter = submitter;
closeSubmitterOnClose = false;
} else {
this.submitter = Submitter.withDefaultExecutor();
closeSubmitterOnClose = true;
}
this.attemptFrequency = attemptFrequency;
this.logLevelTemporaryFailure = logLevelTemporaryFailure;
this.blockAfterAttempts = blockAfterAttempts;
this.flushBatchSize = flushBatchSize;
this.clockProvider = clockProvider;
this.listener = listener;
this.serializeMdc = serializeMdc;
this.validator = validator;
this.retentionThreshold = retentionThreshold;
}

@Override
public void validate(Validator validator) {
validator.notNull("transactionManager", transactionManager);
Expand Down Expand Up @@ -412,6 +445,14 @@ private void updateAttemptCount(TransactionOutboxEntry entry, Throwable cause) {
}
}

@Override
public void close() {
if (closeSubmitterOnClose) {
submitter.close();
}
Utils.shutdown(scheduler);
}

@ToString
static class TransactionOutboxBuilderImpl extends TransactionOutboxBuilder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.gruelbox.transactionoutbox.UncheckedException;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -106,4 +108,23 @@ public static void logAtLevel(Logger logger, Level level, String message, Object
break;
}
}

public static void shutdown(ExecutorService executorService) {
executorService.shutdownNow();
boolean terminated;
try {
terminated = executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
if (terminated) {
return;
}
logAtLevel(
log,
Level.WARN,
"At least one task didn't stop within 5 seconds following the shutdown signal sent to {}",
executorService);
}
}
Loading