Skip to content

Allow to configure retry policy on a per-task basis #790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,48 @@ However, if you completely trust your serialized data (for example, your develop

See [transaction-outbox-jackson](transactionoutbox-jackson/README.md), which uses a specially-configured Jackson `ObjectMapper` to achieve this.

### Flexible retry policy

The default retry policy for tasks is configured using:
```
TransactionOutbox outbox = TransactionOutbox.builder()
...
// 10 attempts at a task before blocking it.
.blockAfterAttempts(10)
// Flush once every 15 minutes only
.attemptFrequency(Duration.ofMinutes(15))
.build();
```
and suitable for most use cases.

If you need to override the default retry policy on a per-task basis, you can do so by implementing the `RetryPolicyAware` interface in the class you pass to `outbox.schedule()`:
```
public class RetryPolicyAwareService implements RetryPolicyAware {
@Override
public Duration waitDuration(int attempt, Throwable throwable) {
Duration initialInterval = Duration.ofMillis(100);
// Exponential backoff using IntervalFunction from resilience4j-core
long waitDurationMillis = IntervalFunction.ofExponentialBackoff(initialInterval).apply(attempt);
return Duration.ofMillis(waitDurationMillis);
}

@Override
public int blockAfterAttempts(int attempt, Throwable throwable) {
// Stop retrying and block outbox entry immediately on ServerError
if (throwable instanceof ServerError) {
return 0;
}
return 3;
}

public void callExternalService() {
// ...
}
}

outbox.schedule(RetryPolicyAwareService.class).callExternalService();
```

### Clustering

The default mechanism for _running_ tasks (either immediately, or when they are picked up by background processing) is via a `java.concurrent.Executor`, which effectively does the following:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.gruelbox.transactionoutbox;

import java.time.Duration;

/**
* Defines a custom retry policy for tasks scheduled in the {@link TransactionOutbox}.
*
* <p>Implement this interface in the class that is passed to {@link
* TransactionOutbox#schedule(Class)} to override the default retry behavior.
*/
public interface RetryPolicyAware {
/**
* Determines the wait duration before retrying a failed task.
*
* @param attempt The current retry attempt (starting from 1).
* @param throwable The exception that caused the failure.
* @return The duration to wait before the next retry.
*/
Duration waitDuration(int attempt, Throwable throwable);

/**
* Specifies the maximum number of retry attempts before blocking the task.
*
* @param attempt The current retry attempt (starting from 1).
* @param throwable The exception that caused the failure.
* @return The number of attempts after which the task should be blocked. If the returned value is
* less than or equal to {@code attempt}, the task is blocked immediately.
*/
int blockAfterAttempts(int attempt, Throwable throwable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private boolean doFlush(Function<Transaction, Collection<TransactionOutboxEntry>
for (var entry : entries) {
log.debug("Triggering {}", entry.description());
try {
pushBack(transaction, entry);
pushBack(transaction, entry, attemptFrequency);
result.add(entry);
} catch (OptimisticLockException e) {
log.debug("Beaten to optimistic lock on {}", entry.description());
Expand Down Expand Up @@ -286,6 +286,7 @@ private void submitNow(TransactionOutboxEntry entry) {
public void processNow(TransactionOutboxEntry entry) {
initialize();
Boolean success = null;
InvocationInstanceHolder invocationInstanceHolder = new InvocationInstanceHolder();
try {
success =
transactionManager.inTransactionReturnsThrows(
Expand All @@ -298,7 +299,8 @@ public void processNow(TransactionOutboxEntry entry) {
.withinMDC(
() -> {
log.info("Processing {}", entry.description());
invoke(entry, tx);
invocationInstanceHolder.instance = getInvocationInstance(entry);
invoke(invocationInstanceHolder.instance, entry, tx);
if (entry.getUniqueRequestId() == null) {
persistor.delete(tx, entry);
} else {
Expand All @@ -316,9 +318,9 @@ public void processNow(TransactionOutboxEntry entry) {
return true;
});
} catch (InvocationTargetException e) {
updateAttemptCount(entry, e.getCause());
updateAttemptCount(entry, e.getCause(), invocationInstanceHolder.getAsRetryPolicyAware());
} catch (Exception e) {
updateAttemptCount(entry, e);
updateAttemptCount(entry, e, invocationInstanceHolder.getAsRetryPolicyAware());
}
if (success != null) {
if (success) {
Expand All @@ -330,13 +332,18 @@ public void processNow(TransactionOutboxEntry entry) {
}
}

private void invoke(TransactionOutboxEntry entry, Transaction transaction)
private Object getInvocationInstance(TransactionOutboxEntry entry) {
Object invocationInstance = instantiator.getInstance(entry.getInvocation().getClassName());
log.debug("Created instance {}", invocationInstance);
return invocationInstance;
}

private void invoke(
Object invocationInstance, TransactionOutboxEntry entry, Transaction transaction)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
Object instance = instantiator.getInstance(entry.getInvocation().getClassName());
log.debug("Created instance {}", instance);
transactionManager
.injectTransaction(entry.getInvocation(), transaction)
.invoke(instance, listener);
.invoke(invocationInstance, listener);
}

private TransactionOutboxEntry newEntry(
Expand All @@ -362,11 +369,12 @@ private TransactionOutboxEntry newEntry(
.build();
}

private void pushBack(Transaction transaction, TransactionOutboxEntry entry)
private void pushBack(
Transaction transaction, TransactionOutboxEntry entry, Duration waitDuration)
throws OptimisticLockException {
try {
entry.setLastAttemptTime(clockProvider.get().instant());
entry.setNextAttemptTime(after(attemptFrequency));
entry.setNextAttemptTime(after(waitDuration));
validator.validate(entry);
persistor.update(transaction, entry);
} catch (OptimisticLockException e) {
Expand All @@ -380,12 +388,23 @@ private Instant after(Duration duration) {
return clockProvider.get().instant().plus(duration).truncatedTo(MILLIS);
}

private void updateAttemptCount(TransactionOutboxEntry entry, Throwable cause) {
private void updateAttemptCount(
TransactionOutboxEntry entry, Throwable cause, RetryPolicyAware retryPolicyAware) {
try {
entry.setAttempts(entry.getAttempts() + 1);

int blockAfterAttempts =
retryPolicyAware == null
? this.blockAfterAttempts
: retryPolicyAware.blockAfterAttempts(entry.getAttempts(), cause);
Duration waitDuration =
retryPolicyAware == null
? this.attemptFrequency
: retryPolicyAware.waitDuration(entry.getAttempts(), cause);

var blocked = (entry.getTopic() == null) && (entry.getAttempts() >= blockAfterAttempts);
entry.setBlocked(blocked);
transactionManager.inTransactionThrows(tx -> pushBack(tx, entry));
transactionManager.inTransactionThrows(tx -> pushBack(tx, entry, waitDuration));
listener.failure(entry, cause);
if (blocked) {
log.error(
Expand Down Expand Up @@ -460,4 +479,16 @@ public <T> T schedule(Class<T> clazz) {
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered, delayForAtLeast);
}
}

private static class InvocationInstanceHolder {
Object instance;

RetryPolicyAware getAsRetryPolicyAware() {
if (instance instanceof RetryPolicyAware) {
return (RetryPolicyAware) instance;
} else {
return null;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -476,16 +476,31 @@ public <T, E extends Exception> T requireTransactionReturns(
*/
@Test
final void retryBehaviour() throws Exception {
retryBehaviour(new FailingInstantiator(3), Duration.ofMillis(500));
}

/**
* Runs a piece of work which will fail several times before working successfully. Ensures that
* the work runs eventually. Uses RetryPolicyAware to configure retry policy.
*/
@Test
final void retryPolicyAwareRetryBehaviour() throws Exception {
retryBehaviour(
new FailingRetryPolicyAwareInstantiator(3, 3), Duration.ofHours(1) // should be ignored
);
}

private void retryBehaviour(FailingInstantiator instantiator, Duration attemptFrequency)
throws Exception {
TransactionManager transactionManager = txManager();
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger attempts = new AtomicInteger();
TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.instantiator(new FailingInstantiator(attempts))
.instantiator(instantiator)
.submitter(Submitter.withExecutor(singleThreadPool))
.attemptFrequency(Duration.ofMillis(500))
.attemptFrequency(attemptFrequency)
.listener(new LatchListener(latch))
.build();

Expand Down Expand Up @@ -549,13 +564,12 @@ final void lastAttemptTime_updatesEveryTime() throws Exception {
TransactionManager transactionManager = txManager();
CountDownLatch successLatch = new CountDownLatch(1);
CountDownLatch blockLatch = new CountDownLatch(1);
AtomicInteger attempts = new AtomicInteger();
var orderedEntryListener = new OrderedEntryListener(successLatch, blockLatch);
TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.instantiator(new FailingInstantiator(attempts))
.instantiator(new FailingInstantiator(3))
.submitter(Submitter.withExecutor(singleThreadPool))
.attemptFrequency(Duration.ofMillis(500))
.listener(orderedEntryListener)
Expand Down Expand Up @@ -601,20 +615,36 @@ final void lastAttemptTime_updatesEveryTime() throws Exception {
*/
@Test
final void blockAndThenUnblockForRetry() throws Exception {
blockAndThenUnblockForRetry(new FailingInstantiator(3), 2);
}

/**
* Runs a piece of work which will fail enough times to enter a blocked state but will then pass
* when re-tried after it is unblocked. Uses RetryPolicyAware to configure the number of max
* attempts.
*/
@Test
final void retryPolicyAwareBlockAndThenUnblockForRetry() throws Exception {
blockAndThenUnblockForRetry(
new FailingRetryPolicyAwareInstantiator(3, 2), 100 // should be ignored
);
}

private void blockAndThenUnblockForRetry(FailingInstantiator instantiator, int blockAfterAttempts)
throws Exception {
TransactionManager transactionManager = txManager();
CountDownLatch successLatch = new CountDownLatch(1);
CountDownLatch blockLatch = new CountDownLatch(1);
LatchListener latchListener = new LatchListener(successLatch, blockLatch);
AtomicInteger attempts = new AtomicInteger();
TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.instantiator(new FailingInstantiator(attempts))
.instantiator(instantiator)
.submitter(Submitter.withExecutor(singleThreadPool))
.attemptFrequency(Duration.ofMillis(500))
.listener(latchListener)
.blockAfterAttempts(2)
.blockAfterAttempts(blockAfterAttempts)
.build();

clearOutbox();
Expand Down Expand Up @@ -695,10 +725,12 @@ protected String createTestTable() {

private static class FailingInstantiator implements Instantiator {

private final AtomicInteger attempts;
protected final int attemptsBeforeSuccess;

protected final AtomicInteger attempts = new AtomicInteger();

FailingInstantiator(AtomicInteger attempts) {
this.attempts = attempts;
private FailingInstantiator(int attemptsBeforeSuccess) {
this.attemptsBeforeSuccess = attemptsBeforeSuccess;
}

@Override
Expand All @@ -707,18 +739,60 @@ public String getName(Class<?> clazz) {
}

@Override
public Object getInstance(String name) {
public InterfaceProcessor getInstance(String name) {
if (!"BEEF".equals(name)) {
throw new UnsupportedOperationException();
}
return (InterfaceProcessor)
(foo, bar) -> {
LOGGER.info("Processing ({}, {})", foo, bar);
if (attempts.incrementAndGet() < 3) {
throw new RuntimeException("Temporary failure");
}
LOGGER.info("Processed ({}, {})", foo, bar);
};
return (foo, bar) -> {
LOGGER.info("Processing ({}, {})", foo, bar);
if (attempts.incrementAndGet() < attemptsBeforeSuccess) {
throw new RuntimeException("Temporary failure");
}
LOGGER.info("Processed ({}, {})", foo, bar);
};
}
}

private static class FailingRetryPolicyAwareInstantiator extends FailingInstantiator
implements Instantiator {

private final int blockAfterAttempts;

private FailingRetryPolicyAwareInstantiator(int attemptsBeforeSuccess, int blockAfterAttempts) {
super(attemptsBeforeSuccess);
this.blockAfterAttempts = blockAfterAttempts;
}

@Override
public InterfaceProcessor getInstance(String name) {
return new RetryPolicyAwareInterfaceProcessor(super.getInstance(name));
}

private class RetryPolicyAwareInterfaceProcessor
implements InterfaceProcessor, RetryPolicyAware {

private final InterfaceProcessor processor;

RetryPolicyAwareInterfaceProcessor(InterfaceProcessor processor) {
this.processor = processor;
}

@Override
public Duration waitDuration(int attempt, Throwable throwable) {
Duration waitDuration = Duration.ofMillis(100L * attempt);
LOGGER.info("Waiting {} for attempt {}", waitDuration, attempt);
return waitDuration;
}

@Override
public int blockAfterAttempts(int attempt, Throwable throwable) {
return blockAfterAttempts;
}

@Override
public void process(int foo, String bar) {
processor.process(foo, bar);
}
}
}

Expand Down