Skip to content

Commit fed6a04

Browse files
committed
#80 fix handling of nested transactions
1 parent ddc775d commit fed6a04

File tree

5 files changed

+206
-154
lines changed

5 files changed

+206
-154
lines changed

src/main/java/com/arangodb/springframework/repository/query/QueryTransactionBridge.java

+26-6
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,41 @@
2929
* Bridge to postpone late transaction start to be able to inject collections from query side.
3030
*/
3131
public class QueryTransactionBridge {
32-
private static final Function<Collection<String>, String> NO_TRANSACTION = any -> null;
33-
private static final ThreadLocal<Function<Collection<String>, String>> CURRENT_TRANSACTION = new NamedInheritableThreadLocal<>("ArangoTransactionBegin");
3432

35-
public QueryTransactionBridge() {
36-
CURRENT_TRANSACTION.set(NO_TRANSACTION);
37-
}
33+
private static final ThreadLocal<Function<Collection<String>, String>> CURRENT_TRANSACTION = new NamedInheritableThreadLocal<Function<Collection<String>, String>>("ArangoTransactionBegin") {
34+
@Override
35+
protected Function<Collection<String>, String> initialValue() {
36+
return any -> null;
37+
}
38+
};
3839

40+
/**
41+
* Prepare the bridge for accepting transaction begin.
42+
* @param begin a function accepting collection names and returning a stream transaction id
43+
*
44+
* @see com.arangodb.springframework.transaction.ArangoTransactionManager
45+
*/
3946
public void setCurrentTransaction(Function<Collection<String>, String> begin) {
4047
CURRENT_TRANSACTION.set(begin);
4148
}
4249

50+
/**
51+
* Reset the bridge ignoring transaction begin.
52+
*
53+
* @see com.arangodb.springframework.transaction.ArangoTransactionManager
54+
*/
4355
public void clearCurrentTransaction() {
44-
CURRENT_TRANSACTION.set(NO_TRANSACTION);
56+
CURRENT_TRANSACTION.remove();
4557
}
4658

59+
/**
60+
* Applies the collection names to any current transaction.
61+
* @param collections additional collection names
62+
* @return the stream transaction id or {@code null} without transaction
63+
*
64+
* @see AbstractArangoQuery
65+
* @see com.arangodb.springframework.repository.SimpleArangoRepository
66+
*/
4767
public String getCurrentTransaction(Collection<String> collections) {
4868
return CURRENT_TRANSACTION.get().apply(collections);
4969
}

src/main/java/com/arangodb/springframework/transaction/ArangoTransactionResource.java renamed to src/main/java/com/arangodb/springframework/transaction/ArangoTransactionHolder.java

+21-30
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020

2121
package com.arangodb.springframework.transaction;
2222

23+
import com.arangodb.entity.StreamTransactionEntity;
24+
import com.arangodb.entity.StreamTransactionStatus;
2325
import org.springframework.lang.Nullable;
2426
import org.springframework.transaction.support.TransactionSynchronizationManager;
2527

28+
import java.util.Collection;
2629
import java.util.HashSet;
2730
import java.util.Set;
2831

@@ -32,53 +35,41 @@
3235
* @see TransactionSynchronizationManager#bindResource(Object, Object)
3336
* @see ArangoTransactionObject
3437
*/
35-
class ArangoTransactionResource {
38+
class ArangoTransactionHolder {
3639

37-
private String streamTransactionId;
38-
private Set<String> collectionNames;
39-
40-
private boolean rollbackOnly;
41-
int references = 0;
42-
43-
ArangoTransactionResource(@Nullable String streamTransactionId, Set<String> collectionNames, boolean rollbackOnly) {
44-
this.streamTransactionId = streamTransactionId;
45-
setCollectionNames(collectionNames);
46-
this.rollbackOnly = rollbackOnly;
47-
}
40+
private final Set<String> collectionNames = new HashSet<>();
41+
private StreamTransactionEntity transaction = null;
42+
private boolean rollbackOnly = false;
4843

44+
@Nullable
4945
String getStreamTransactionId() {
50-
return streamTransactionId;
46+
return transaction == null ? null : transaction.getId();
5147
}
5248

53-
void setStreamTransactionId(String streamTransactionId) {
54-
this.streamTransactionId = streamTransactionId;
49+
void setStreamTransaction(StreamTransactionEntity transaction) {
50+
this.transaction = transaction;
5551
}
5652

5753
Set<String> getCollectionNames() {
5854
return collectionNames;
5955
}
6056

61-
void setCollectionNames(Set<String> collectionNames) {
62-
this.collectionNames = new HashSet<>(collectionNames);
57+
void addCollectionNames(Collection<String> collectionNames) {
58+
if (transaction != null) {
59+
throw new IllegalStateException("Collections must not be added after stream transaction begun");
60+
}
61+
this.collectionNames.addAll(collectionNames);
6362
}
6463

6564
boolean isRollbackOnly() {
66-
return rollbackOnly;
67-
}
68-
69-
void setRollbackOnly(boolean rollbackOnly) {
70-
this.rollbackOnly = rollbackOnly;
71-
}
72-
73-
void increaseReferences() {
74-
++references;
65+
return rollbackOnly || isStatus(StreamTransactionStatus.aborted);
7566
}
7667

77-
boolean isSingleReference() {
78-
return references <= 1;
68+
void setRollbackOnly() {
69+
rollbackOnly = true;
7970
}
8071

81-
void decreasedReferences() {
82-
--references;
72+
public boolean isStatus(StreamTransactionStatus status) {
73+
return transaction != null && transaction.getStatus() == status;
8374
}
8475
}

src/main/java/com/arangodb/springframework/transaction/ArangoTransactionManager.java

+80-41
Original file line numberDiff line numberDiff line change
@@ -23,61 +23,85 @@
2323
import com.arangodb.ArangoDBException;
2424
import com.arangodb.ArangoDatabase;
2525
import com.arangodb.DbName;
26-
import com.arangodb.model.StreamTransactionOptions;
2726
import com.arangodb.springframework.core.ArangoOperations;
2827
import com.arangodb.springframework.repository.query.QueryTransactionBridge;
28+
import org.springframework.beans.factory.InitializingBean;
29+
import org.springframework.lang.Nullable;
2930
import org.springframework.transaction.*;
3031
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
3132
import org.springframework.transaction.support.DefaultTransactionStatus;
3233
import org.springframework.transaction.support.TransactionSynchronizationManager;
3334

35+
import java.util.function.Function;
36+
3437
/**
3538
* Transaction manager using ArangoDB stream transactions on the
36-
* {@linkplain ArangoOperations#getDatabaseName()} current database} of the template.
37-
* Isolation level {@linkplain TransactionDefinition#ISOLATION_SERIALIZABLE serializable} is not supported.
39+
* {@linkplain ArangoOperations#getDatabaseName() current database} of the
40+
* template. A {@linkplain ArangoTransactionObject transaction object} using
41+
* a shared {@linkplain ArangoTransactionHolder holder} is used for the
42+
* {@link DefaultTransactionStatus}. Neither
43+
* {@linkplain TransactionDefinition#getPropagationBehavior() propagation}
44+
* {@linkplain TransactionDefinition#PROPAGATION_NESTED nested} nor
45+
* {@linkplain TransactionDefinition#getIsolationLevel() isolation}
46+
* {@linkplain TransactionDefinition#ISOLATION_SERIALIZABLE serializable} are
47+
* supported.
3848
*/
39-
public class ArangoTransactionManager extends AbstractPlatformTransactionManager {
49+
public class ArangoTransactionManager extends AbstractPlatformTransactionManager implements InitializingBean {
4050

4151
private final ArangoOperations operations;
4252
private final QueryTransactionBridge bridge;
4353

4454
public ArangoTransactionManager(ArangoOperations operations, QueryTransactionBridge bridge) {
4555
this.operations = operations;
4656
this.bridge = bridge;
47-
setValidateExistingTransaction(true);
57+
super.setGlobalRollbackOnParticipationFailure(true);
58+
super.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
59+
}
60+
61+
/**
62+
* Check for supported property settings.
63+
*/
64+
@Override
65+
public void afterPropertiesSet() {
66+
if (isNestedTransactionAllowed()) {
67+
throw new IllegalStateException("Nested transactions must not be allowed");
68+
}
69+
if (!isGlobalRollbackOnParticipationFailure()) {
70+
throw new IllegalStateException("Global rollback on participating failure is needed");
71+
}
72+
if (getTransactionSynchronization() == SYNCHRONIZATION_NEVER) {
73+
throw new IllegalStateException("Transaction synchronization is needed always");
74+
}
4875
}
4976

5077
/**
51-
* Creates a new transaction object. Any synchronized resource will be reused.
78+
* Creates a new transaction object. Any holder bound will be reused.
5279
*/
5380
@Override
5481
protected ArangoTransactionObject doGetTransaction() {
5582
DbName database = operations.getDatabaseName();
56-
if (logger.isDebugEnabled()) {
57-
logger.debug("Create new transaction for database " + database);
58-
}
83+
ArangoTransactionHolder holder = (ArangoTransactionHolder) TransactionSynchronizationManager.getResource(database);
5984
try {
60-
ArangoTransactionResource resource = (ArangoTransactionResource) TransactionSynchronizationManager.getResource(database);
61-
return new ArangoTransactionObject(operations.driver().db(database), getDefaultTimeout(), resource);
85+
return new ArangoTransactionObject(operations.driver().db(database), getDefaultTimeout(), holder);
6286
} catch (ArangoDBException error) {
6387
throw new TransactionSystemException("Cannot create transaction object", error);
6488
}
6589
}
6690

6791
/**
68-
* Configures the new transaction object. The resulting resource will be synchronized and the bridge will be initialized.
92+
* Connect the new transaction object to the query bridge.
6993
*
70-
* @see ArangoDatabase#beginStreamTransaction(StreamTransactionOptions)
71-
* @see QueryTransactionBridge
94+
* @see QueryTransactionBridge#setCurrentTransaction(Function)
95+
* @see #prepareSynchronization(DefaultTransactionStatus, TransactionDefinition)
96+
* @throws InvalidIsolationLevelException for {@link TransactionDefinition#ISOLATION_SERIALIZABLE}
7297
*/
7398
@Override
74-
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionUsageException {
99+
protected void doBegin(Object transaction, TransactionDefinition definition) throws InvalidIsolationLevelException {
75100
int isolationLevel = definition.getIsolationLevel();
76-
if (isolationLevel != -1 && (isolationLevel & TransactionDefinition.ISOLATION_SERIALIZABLE) != 0) {
101+
if (isolationLevel != TransactionDefinition.ISOLATION_DEFAULT && (isolationLevel & TransactionDefinition.ISOLATION_SERIALIZABLE) != 0) {
77102
throw new InvalidIsolationLevelException("ArangoDB does not support isolation level serializable");
78103
}
79104
ArangoTransactionObject tx = (ArangoTransactionObject) transaction;
80-
tx.configure(definition);
81105
bridge.setCurrentTransaction(collections -> {
82106
try {
83107
return tx.getOrBegin(collections).getStreamTransactionId();
@@ -88,9 +112,11 @@ protected void doBegin(Object transaction, TransactionDefinition definition) thr
88112
}
89113

90114
/**
91-
* Commit the current stream transaction iff any. The bridge is cleared afterwards.
115+
* Commit the current stream transaction. The query bridge is cleared
116+
* afterwards.
92117
*
93118
* @see ArangoDatabase#commitStreamTransaction(String)
119+
* @see QueryTransactionBridge#clearCurrentTransaction()
94120
*/
95121
@Override
96122
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
@@ -100,16 +126,19 @@ protected void doCommit(DefaultTransactionStatus status) throws TransactionExcep
100126
}
101127
try {
102128
tx.commit();
103-
bridge.clearCurrentTransaction();
104129
} catch (ArangoDBException error) {
105130
throw new TransactionSystemException("Cannot commit transaction " + tx, error);
131+
} finally {
132+
bridge.clearCurrentTransaction();
106133
}
107134
}
108135

109136
/**
110-
* Roll back the current stream transaction iff any. The bridge is cleared afterwards.
137+
* Roll back the current stream transaction. The query bridge is cleared
138+
* afterwards.
111139
*
112140
* @see ArangoDatabase#abortStreamTransaction(String)
141+
* @see QueryTransactionBridge#clearCurrentTransaction()
113142
*/
114143
@Override
115144
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
@@ -119,58 +148,68 @@ protected void doRollback(DefaultTransactionStatus status) throws TransactionExc
119148
}
120149
try {
121150
tx.rollback();
122-
bridge.clearCurrentTransaction();
123151
} catch (ArangoDBException error) {
124152
throw new TransactionSystemException("Cannot roll back transaction " + tx, error);
153+
} finally {
154+
bridge.clearCurrentTransaction();
125155
}
126156
}
127157

128158
/**
129-
* Check if the transaction objects has an underlying stream transaction.
130-
*
131-
* @see ArangoDatabase#getStreamTransaction(String)
159+
* Check if the transaction object has the bound holder. For new
160+
* transactions the holder will be bound afterwards.
132161
*/
133162
@Override
134163
protected boolean isExistingTransaction(Object transaction) throws TransactionException {
135-
return ((ArangoTransactionObject) transaction).exists();
164+
ArangoTransactionHolder holder = ((ArangoTransactionObject) transaction).getHolder();
165+
return holder == TransactionSynchronizationManager.getResource(operations.getDatabaseName());
136166
}
137167

168+
/**
169+
* Mark the transaction as global rollback only.
170+
*
171+
* @see #isGlobalRollbackOnParticipationFailure()
172+
*/
138173
@Override
139174
protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException {
140175
ArangoTransactionObject tx = (ArangoTransactionObject) status.getTransaction();
141-
tx.setRollbackOnly();
176+
tx.getHolder().setRollbackOnly();
142177
}
143178

179+
/**
180+
* Any transaction object is configured according to the definition upfront.
181+
*
182+
* @see ArangoTransactionObject#configure(TransactionDefinition)
183+
*/
144184
@Override
145-
protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, Object suspendedResources) {
185+
protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
186+
if (transaction instanceof ArangoTransactionObject) {
187+
((ArangoTransactionObject) transaction).configure(definition);
188+
}
146189
return super.newTransactionStatus(definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
147190
}
148191

149192
/**
150-
* Bind the resource for the first new transaction created.
193+
* Bind the holder for the first new transaction created.
194+
*
195+
* @see ArangoTransactionHolder
151196
*/
152197
@Override
153198
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
154199
super.prepareSynchronization(status, definition);
155-
if (status.isNewTransaction()) {
156-
ArangoTransactionResource resource = ((ArangoTransactionObject) status.getTransaction()).getResource();
157-
resource.increaseReferences();
158-
if (resource.isSingleReference()) {
159-
TransactionSynchronizationManager.bindResource(operations.getDatabaseName(), resource);
160-
}
200+
if (status.isNewSynchronization()) {
201+
ArangoTransactionHolder holder = ((ArangoTransactionObject) status.getTransaction()).getHolder();
202+
TransactionSynchronizationManager.bindResource(operations.getDatabaseName(), holder);
161203
}
162204
}
163205

164206
/**
165-
* Unbind the resource for the last transaction completed.
207+
* Unbind the holder from the last transaction completed.
208+
*
209+
* @see ArangoTransactionHolder
166210
*/
167211
@Override
168212
protected void doCleanupAfterCompletion(Object transaction) {
169-
ArangoTransactionResource resource = ((ArangoTransactionObject) transaction).getResource();
170-
if (resource.isSingleReference()) {
171-
TransactionSynchronizationManager.unbindResource(operations.getDatabaseName());
172-
}
173-
resource.decreasedReferences();
213+
TransactionSynchronizationManager.unbindResource(operations.getDatabaseName());
174214
}
175-
176215
}

0 commit comments

Comments
 (0)