Skip to content

Provide ability to check status of a specific entry #540

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private Class<?> toClass(ClassLoader classLoader, String name) {
try {
return classLoader != null ? Class.forName(name, false, classLoader) : Class.forName(name);
} catch (ClassNotFoundException e) {
throw new RuntimeException(
throw new IllegalArgumentException(
"Cannot determine array type for "
+ name
+ " using "
Expand All @@ -217,37 +217,41 @@ private Class<?> toClass(ClassLoader classLoader, String name) {
public JsonElement serialize(Invocation src, Type typeOfSrc, JsonSerializationContext context) {
if (version == 1) {
log.warn("Serializing as deprecated version {}", version);
return serializeV1(src, typeOfSrc, context);
return serializeV1(src, context);
}
JsonObject obj = new JsonObject();
obj.addProperty("c", src.getClassName());
obj.addProperty("m", src.getMethodName());
JsonArray params = new JsonArray();
JsonArray args = new JsonArray();
int i = 0;
for (Class<?> parameterType : src.getParameterTypes()) {
params.add(nameForClass(parameterType));
Object arg = src.getArgs()[i];
if (arg == null) {
JsonObject jsonObject = new JsonObject();
jsonObject.add("t", null);
jsonObject.add("v", null);
args.add(jsonObject);
} else {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("t", nameForClass(arg.getClass()));
jsonObject.add("v", context.serialize(arg));
args.add(jsonObject);
if (src.getParameterTypes() != null) {
JsonArray params = new JsonArray();
JsonArray args = new JsonArray();
int i = 0;
for (Class<?> parameterType : src.getParameterTypes()) {
params.add(nameForClass(parameterType));
Object arg = src.getArgs() == null ? null : src.getArgs()[i];
if (arg == null) {
JsonObject jsonObject = new JsonObject();
jsonObject.add("t", null);
jsonObject.add("v", null);
args.add(jsonObject);
} else {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("t", nameForClass(arg.getClass()));
jsonObject.add("v", context.serialize(arg));
args.add(jsonObject);
}
i++;
}
i++;
if (src.getArgs() != null) {
obj.add("a", args);
}
obj.add("p", params);
}
obj.add("p", params);
obj.add("a", args);
obj.add("x", context.serialize(src.getMdc()));
return obj;
}

JsonElement serializeV1(Invocation src, Type typeOfSrc, JsonSerializationContext context) {
JsonElement serializeV1(Invocation src, JsonSerializationContext context) {
JsonObject obj = new JsonObject();
obj.addProperty("c", src.getClassName());
obj.addProperty("m", src.getMethodName());
Expand All @@ -274,39 +278,46 @@ public Invocation deserialize(
String className = jsonObject.get("c").getAsString();
String methodName = jsonObject.get("m").getAsString();

JsonArray jsonParams = jsonObject.get("p").getAsJsonArray();
Class<?>[] params = new Class<?>[jsonParams.size()];
for (int i = 0; i < jsonParams.size(); i++) {
JsonElement param = jsonParams.get(i);
if (param.isJsonObject()) {
// For backwards compatibility
params[i] = classForName(param.getAsJsonObject().get("t").getAsString());
} else {
params[i] = classForName(param.getAsString());
Class<?>[] params = null;
if (jsonObject.has("p") || version == 1) {
JsonArray jsonParams = jsonObject.get("p").getAsJsonArray();
params = new Class<?>[jsonParams.size()];
for (int i = 0; i < jsonParams.size(); i++) {
JsonElement param = jsonParams.get(i);
if (param.isJsonObject()) {
// For backwards compatibility
params[i] = classForName(param.getAsJsonObject().get("t").getAsString());
} else {
params[i] = classForName(param.getAsString());
}
}
}

JsonElement argsElement = jsonObject.get("a");
if (argsElement == null) {
// For backwards compatibility
argsElement = jsonObject.get("p");
}
JsonArray jsonArgs = argsElement.getAsJsonArray();
Object[] args = new Object[jsonArgs.size()];
for (int i = 0; i < jsonArgs.size(); i++) {
JsonElement arg = jsonArgs.get(i);
JsonElement argType = arg.getAsJsonObject().get("t");
if (argType != null) {
JsonElement argValue = arg.getAsJsonObject().get("v");
Class<?> argClass = classForName(argType.getAsString());
try {
args[i] = context.deserialize(argValue, argClass);
} catch (Exception e) {
throw new RuntimeException(
"Failed to deserialize arg [" + argValue + "] of type [" + argType + "]", e);
Object[] args = null;
if (jsonObject.has("a") || version == 1) {
JsonElement argsElement = jsonObject.get("a");
if (argsElement == null) {
// For backwards compatibility
argsElement = jsonObject.get("p");
}
JsonArray jsonArgs = argsElement.getAsJsonArray();
args = new Object[jsonArgs.size()];
for (int i = 0; i < jsonArgs.size(); i++) {
JsonElement arg = jsonArgs.get(i);
JsonElement argType = arg.getAsJsonObject().get("t");
if (argType != null) {
JsonElement argValue = arg.getAsJsonObject().get("v");
Class<?> argClass = classForName(argType.getAsString());
try {
args[i] = context.deserialize(argValue, argClass);
} catch (Exception e) {
throw new IllegalArgumentException(
"Failed to deserialize arg [" + argValue + "] of type [" + argType + "]", e);
}
}
}
}

Map<String, String> mdc = context.deserialize(jsonObject.get("x"), Map.class);

return new Invocation(className, methodName, params, args, mdc);
Expand Down Expand Up @@ -450,7 +461,7 @@ public YearMonth read(JsonReader in) throws IOException {
}

static final class UtcDateTypeAdapter extends TypeAdapter<Date> {
private final TimeZone UTC_TIME_ZONE = TimeZone.getTimeZone("UTC");
private static final TimeZone UTC_TIME_ZONE = TimeZone.getTimeZone("UTC");

@Override
public void write(JsonWriter out, Date date) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,4 +427,17 @@ public boolean checkConnection(Transaction tx) throws SQLException {
return rs.next() && (rs.getInt(1) == 1);
}
}

@Override
public Optional<TransactionOutboxEntry> load(Transaction tx, String entryId) throws Exception {
//noinspection resource
try (PreparedStatement stmt =
tx.connection()
.prepareStatement("SELECT " + ALL_FIELDS + " FROM " + tableName + " WHERE id = ?")) {
stmt.setString(1, entryId);
List<TransactionOutboxEntry> results = new ArrayList<>(1);
gatherResults(stmt, results);
return results.stream().findFirst();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,13 @@ List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, Instant
* @param tx The current {@link Transaction}.
*/
void clear(Transaction tx) throws Exception;

/**
* Loads the specified entry.
*
* @param tx The current {@link Transaction}.
* @param entryId The entry id.
* @return The entry, or empty if it does not exist.
*/
Optional<TransactionOutboxEntry> load(Transaction tx, String entryId) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now)
@Override
public void clear(Transaction tx) {}

@Override
public Optional<TransactionOutboxEntry> load(Transaction tx, String entryId) {
return Optional.empty();
}

@Override
public boolean checkConnection(Transaction tx) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import java.time.Clock;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.ToString;
import org.slf4j.MDC;
Expand Down Expand Up @@ -135,6 +137,51 @@ default boolean flush() {
@SuppressWarnings("WeakerAccess")
void processNow(TransactionOutboxEntry entry);

/**
* Loads the specified queued task. This can be used to check the status.
*
* <p>Requires the transaction manager in use to be a {@link
* ThreadLocalContextTransactionManager}.
*
* <p>Note that nothing may be returned for four distinct reasons:
*
* <ul>
* <li>Such a task has never been created and is therefore unknown
* <li>The task was created, and successfully completed, but had no {@code uniqueRequestId} so
* was deleted.
* <li>The task was created, and completed, and had a {@code uniqueRequestId}, but the {@code
* retentionThreshold} has passed so the record has been deleted
* <li>The task was created, and may have succeeded or failed but was deleted from the database
* by a user for some reason.
* </ul>
*
* @param entryId The entry id.
* @return Empty if no such entry exists, otherwise the entry.
*/
Optional<TransactionOutboxEntry> fetchEntry(String entryId);

/**
* Loads the specified queued task. This can be used to check the status.
*
* <p>Note that nothing may be returned for four distinct reasons:
*
* <ul>
* <li>Such a task has never been created and is therefore unknown
* <li>The task was created, and successfully completed, but had no {@code uniqueRequestId} so
* was deleted.
* <li>The task was created, and completed, and had a {@code uniqueRequestId}, but the {@code
* retentionThreshold} has passed so the record has been deleted
* <li>The task was created, and may have succeeded or failed but was deleted from the database
* by a user for some reason.
* </ul>
*
* @param entryId The entry id.
* @param transactionContext The transaction context (if using a {@link
* ParameterContextTransactionManager}).
* @return Empty if no such entry exists, otherwise the entry.
*/
Optional<TransactionOutboxEntry> fetchEntry(String entryId, Object transactionContext);

/** Builder for {@link TransactionOutbox}. */
@ToString
abstract class TransactionOutboxBuilder {
Expand Down Expand Up @@ -368,6 +415,22 @@ interface ParameterizedScheduleBuilder {
*/
ParameterizedScheduleBuilder ordered(String topic);

/**
* If set, {@code callback} will be called in the current thread immediately after writing the
* entry to the database, so that the details of the record can be inspected or the {@code id}
* retained to be able to check the status with {@link TransactionOutbox#fetchEntry(String)}.
*
* <p>Note that a successful callback does not necessarily mean that the entry has, or will
* ever, be processed. The current transaction could be rolled back rather than committed. If
* this occurs, {@link TransactionOutbox#fetchEntry(String)} will simply return empty.
*
* @param callback Receives the {@link TransactionOutboxEntry} once it has been inserted into
* the database (though this insert may be rolled back and will not be processed if this
* happens).
* @return Builder.
*/
ParameterizedScheduleBuilder entryCallback(Consumer<TransactionOutboxEntry> callback);

/**
* Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters
* to the request as configured using {@link TransactionOutbox#with()}.
Expand Down
Loading