1
1
package com .gruelbox .transactionoutbox ;
2
2
3
- import lombok .AccessLevel ;
4
- import lombok .AllArgsConstructor ;
5
- import lombok .Builder ;
6
- import lombok .experimental .SuperBuilder ;
7
- import lombok .extern .slf4j .Slf4j ;
8
-
9
3
import java .io .IOException ;
10
4
import java .io .Reader ;
11
5
import java .io .StringWriter ;
15
9
import java .util .ArrayList ;
16
10
import java .util .List ;
17
11
import java .util .Optional ;
12
+ import lombok .AccessLevel ;
13
+ import lombok .AllArgsConstructor ;
14
+ import lombok .Builder ;
15
+ import lombok .experimental .SuperBuilder ;
16
+ import lombok .extern .slf4j .Slf4j ;
18
17
19
18
/**
20
19
* The default {@link Persistor} for {@link TransactionOutbox}.
@@ -37,9 +36,9 @@ public class DefaultPersistor implements Persistor, Validatable {
37
36
38
37
/**
39
38
* @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write
40
- * lock. There's no point making this long; it's always better to just back off as quickly as
41
- * possible and try another record. Generally these lock timeouts only kick in if {@link
42
- * Dialect#isSupportsSkipLock()} is false.
39
+ * lock. There's no point making this long; it's always better to just back off as quickly as
40
+ * possible and try another record. Generally these lock timeouts only kick in if {@link
41
+ * Dialect#isSupportsSkipLock()} is false.
43
42
*/
44
43
@ SuppressWarnings ("JavaDoc" )
45
44
@ Builder .Default
@@ -60,19 +59,19 @@ public class DefaultPersistor implements Persistor, Validatable {
60
59
61
60
/**
62
61
* @param migrate Set to false to disable automatic database migrations. This may be preferred if
63
- * the default migration behaviour interferes with your existing toolset, and you prefer to
64
- * manage the migrations explicitly (e.g. using FlyWay or Liquibase), or you do not give the
65
- * application DDL permissions at runtime. You may use {@link #writeSchema(Writer)} to access
66
- * the migrations.
62
+ * the default migration behaviour interferes with your existing toolset, and you prefer to
63
+ * manage the migrations explicitly (e.g. using FlyWay or Liquibase), or you do not give the
64
+ * application DDL permissions at runtime. You may use {@link #writeSchema(Writer)} to access
65
+ * the migrations.
67
66
*/
68
67
@ SuppressWarnings ("JavaDoc" )
69
68
@ Builder .Default
70
69
private final boolean migrate = true ;
71
70
72
71
/**
73
72
* @param serializer The serializer to use for {@link Invocation}s. See {@link
74
- * InvocationSerializer} for more information. Defaults to {@link
75
- * InvocationSerializer#createDefaultJsonSerializer()} with no custom serializable classes.
73
+ * InvocationSerializer} for more information. Defaults to {@link
74
+ * InvocationSerializer#createDefaultJsonSerializer()} with no custom serializable classes.
76
75
*/
77
76
@ SuppressWarnings ("JavaDoc" )
78
77
@ Builder .Default
@@ -151,9 +150,9 @@ private void setupInsert(
151
150
@ Override
152
151
public void delete (Transaction tx , TransactionOutboxEntry entry ) throws Exception {
153
152
try (PreparedStatement stmt =
154
- // language=MySQL
155
- tx .connection ()
156
- .prepareStatement ("DELETE FROM " + tableName + " WHERE id = ? and version = ?" )) {
153
+ // language=MySQL
154
+ tx .connection ()
155
+ .prepareStatement ("DELETE FROM " + tableName + " WHERE id = ? and version = ?" )) {
157
156
stmt .setString (1 , entry .getId ());
158
157
stmt .setInt (2 , entry .getVersion ());
159
158
if (stmt .executeUpdate () != 1 ) {
@@ -166,14 +165,14 @@ public void delete(Transaction tx, TransactionOutboxEntry entry) throws Exceptio
166
165
@ Override
167
166
public void update (Transaction tx , TransactionOutboxEntry entry ) throws Exception {
168
167
try (PreparedStatement stmt =
169
- tx .connection ()
170
- .prepareStatement (
171
- // language=MySQL
172
- "UPDATE "
173
- + tableName
174
- + " "
175
- + "SET lastAttemptTime = ?, nextAttemptTime = ?, attempts = ?, blocked = ?, processed = ?, version = ? "
176
- + "WHERE id = ? and version = ?" )) {
168
+ tx .connection ()
169
+ .prepareStatement (
170
+ // language=MySQL
171
+ "UPDATE "
172
+ + tableName
173
+ + " "
174
+ + "SET lastAttemptTime = ?, nextAttemptTime = ?, attempts = ?, blocked = ?, processed = ?, version = ? "
175
+ + "WHERE id = ? and version = ?" )) {
177
176
stmt .setTimestamp (
178
177
1 ,
179
178
entry .getLastAttemptTime () == null ? null : Timestamp .from (entry .getLastAttemptTime ()));
@@ -195,17 +194,17 @@ public void update(Transaction tx, TransactionOutboxEntry entry) throws Exceptio
195
194
@ Override
196
195
public boolean lock (Transaction tx , TransactionOutboxEntry entry ) throws Exception {
197
196
try (PreparedStatement stmt =
198
- tx .connection ()
199
- .prepareStatement (
200
- dialect .isSupportsSkipLock ()
201
- // language=MySQL
202
- ? "SELECT id, invocation FROM "
203
- + tableName
204
- + " WHERE id = ? AND version = ? FOR UPDATE SKIP LOCKED"
205
- // language=MySQL
206
- : "SELECT id, invocation FROM "
207
- + tableName
208
- + " WHERE id = ? AND version = ? FOR UPDATE" )) {
197
+ tx .connection ()
198
+ .prepareStatement (
199
+ dialect .isSupportsSkipLock ()
200
+ // language=MySQL
201
+ ? "SELECT id, invocation FROM "
202
+ + tableName
203
+ + " WHERE id = ? AND version = ? FOR UPDATE SKIP LOCKED"
204
+ // language=MySQL
205
+ : "SELECT id, invocation FROM "
206
+ + tableName
207
+ + " WHERE id = ? AND version = ? FOR UPDATE" )) {
209
208
stmt .setString (1 , entry .getId ());
210
209
stmt .setInt (2 , entry .getVersion ());
211
210
stmt .setQueryTimeout (writeLockTimeoutSeconds );
@@ -254,19 +253,19 @@ public List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, I
254
253
throws Exception {
255
254
String forUpdate = dialect .isSupportsSkipLock () ? " FOR UPDATE SKIP LOCKED" : "" ;
256
255
try (PreparedStatement stmt =
257
- tx .connection ()
258
- .prepareStatement (
259
- // language=MySQL
260
- "SELECT "
261
- + ALL_FIELDS
262
- + " FROM "
263
- + tableName
264
- + " WHERE nextAttemptTime < ? AND blocked = "
265
- + dialect .booleanValue (false )
266
- + " AND processed = "
267
- + dialect .booleanValue (false )
268
- + dialect .getLimitCriteria ()
269
- + forUpdate )) {
256
+ tx .connection ()
257
+ .prepareStatement (
258
+ // language=MySQL
259
+ "SELECT "
260
+ + ALL_FIELDS
261
+ + " FROM "
262
+ + tableName
263
+ + " WHERE nextAttemptTime < ? AND blocked = "
264
+ + dialect .booleanValue (false )
265
+ + " AND processed = "
266
+ + dialect .booleanValue (false )
267
+ + dialect .getLimitCriteria ()
268
+ + forUpdate )) {
270
269
stmt .setTimestamp (1 , Timestamp .from (now ));
271
270
stmt .setInt (2 , batchSize );
272
271
return gatherResults (batchSize , stmt );
@@ -277,8 +276,8 @@ public List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, I
277
276
public int deleteProcessedAndExpired (Transaction tx , int batchSize , Instant now )
278
277
throws Exception {
279
278
try (PreparedStatement stmt =
280
- tx .connection ()
281
- .prepareStatement (dialect .getDeleteExpired ().replace ("{{table}}" , tableName ))) {
279
+ tx .connection ()
280
+ .prepareStatement (dialect .getDeleteExpired ().replace ("{{table}}" , tableName ))) {
282
281
stmt .setTimestamp (1 , Timestamp .from (now ));
283
282
stmt .setInt (2 , batchSize );
284
283
return stmt .executeUpdate ();
@@ -330,16 +329,16 @@ public void clear(Transaction tx) throws SQLException {
330
329
@ Override
331
330
public boolean checkConnection (Transaction tx ) throws SQLException {
332
331
try (Statement stmt = tx .connection ().createStatement ();
333
- ResultSet rs = stmt .executeQuery (dialect .getCheckSql ())) {
332
+ ResultSet rs = stmt .executeQuery (dialect .getCheckSql ())) {
334
333
return rs .next () && (rs .getInt (1 ) == 1 );
335
334
}
336
335
}
337
336
338
337
@ Override
339
338
public Optional <TransactionOutboxEntry > load (Transaction tx , String entryId ) throws Exception {
340
339
try (PreparedStatement stmt =
341
- tx .connection ()
342
- .prepareStatement ("SELECT " + ALL_FIELDS + " FROM " + tableName + " WHERE id = ?" )) {
340
+ tx .connection ()
341
+ .prepareStatement ("SELECT " + ALL_FIELDS + " FROM " + tableName + " WHERE id = ?" )) {
343
342
stmt .setString (1 , entryId );
344
343
List <TransactionOutboxEntry > results = gatherResults (1 , stmt );
345
344
if (results .isEmpty ()) {
0 commit comments