Skip to content

Commit bbdf6a2

Browse files
committed
Move TransactionalContextPool and TransactionalContextConnection to reactive-transactions
Moved TransactionalContextPool and TransactionalContextConnection from hibernate-reactive to reactive-transactions module, as they are not specific to Hibernate Reactive and are used by the transaction interceptors. Additionally, introduced getCurrentConnectionFromVertxContext() method in TransactionalContextPool for consistent access to the current connection from the Vert.x context. Assisted-By: Claude Code <noreply@anthropic.com>
1 parent 5f32085 commit bbdf6a2

File tree

5 files changed

+70
-18
lines changed

5 files changed

+70
-18
lines changed

extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/customized/QuarkusReactiveConnectionPoolInitiator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public ReactiveConnectionPool initiateService(Map configurationValues, ServiceRe
3131
// nothing to do, but given the separate hierarchies have to handle this here.
3232
return null;
3333
}
34-
return new QuarkusSqlClientPool(new TransactionalContextPool(pool));
34+
return new QuarkusSqlClientPool(new io.quarkus.reactive.transaction.runtime.pool.TransactionalContextPool(pool));
3535
}
3636

3737
}

extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorBase.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.jboss.logging.Logger;
1212

1313
import io.quarkus.arc.runtime.InterceptorBindings;
14+
import io.quarkus.reactive.transaction.ReactiveResource;
15+
import io.quarkus.reactive.transaction.runtime.pool.TransactionalContextPool;
1416
import io.quarkus.transaction.annotations.Rollback;
1517
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
1618
import io.smallrye.mutiny.Uni;
@@ -28,10 +30,6 @@
2830
*/
2931
public abstract class TransactionalInterceptorBase {
3032

31-
// Used in this class and in TransactionalContextPool to store and get
32-
// the connection with the lazily created Transaction inside the Vert.x Context
33-
public static final String CURRENT_CONNECTION_KEY = "reactive.transaction.currentConnection";
34-
3533
// This key is used to indicate the method was annotated with @Transactional
3634
// And will open a session and a transaction lazily when the first operation requires a reactive session
3735
// Check HibernateReactiveRecorder.sessionSupplier to see where the session is injected
@@ -127,18 +125,19 @@ private Uni<Void> invokeBeforeCommitAndCommit(Context context) {
127125
}
128126

129127
private Uni<?> closeConnection() {
130-
SqlConnection connection = connectionFromContext();
131-
if (connection == null) {
128+
Future<Void> closeFuture = TransactionalContextPool.closeAndClearCurrentConnection();
129+
if (closeFuture == null) {
132130
// io/quarkus/hibernate/reactive/transaction/DisableJTATransactionTest.java:38
133131
LOG.tracef("Connection doesn't exist, nothing to do here");
134132
return Uni.createFrom().nullItem();
135133
}
134+
SqlConnection connection = connectionFromContext();
136135
LOG.tracef("Closing the connection %s", connection);
137-
return toUni(connection.close());
136+
return toUni(closeFuture);
138137
}
139138

140139
SqlConnection connectionFromContext() {
141-
return Vertx.currentContext().getLocal(CURRENT_CONNECTION_KEY);
140+
return TransactionalContextPool.getCurrentConnectionFromVertxContext();
142141
}
143142

144143
// Based on org/hibernate/reactive/pool/impl/SqlClientConnection.java:305

extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorRequired.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import jakarta.interceptor.InvocationContext;
99
import jakarta.transaction.Transactional;
1010

11+
import io.quarkus.reactive.transaction.ReactiveResource;
12+
1113
@Transactional(Transactional.TxType.REQUIRED)
1214
@Interceptor
1315
@Priority(Interceptor.Priority.PLATFORM_BEFORE + 300)

extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/customized/TransactionalContextConnection.java renamed to extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/pool/TransactionalContextConnection.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.quarkus.hibernate.reactive.runtime.customized;
1+
package io.quarkus.reactive.transaction.runtime.pool;
22

33
import org.jboss.logging.Logger;
44

@@ -86,7 +86,8 @@ public boolean isSSL() {
8686

8787
@Override
8888
public void close(Handler<AsyncResult<Void>> handler) {
89-
connection.close(handler);
89+
LOG.tracef("Calling no-op close on TransactionalContextConnection it will close after the closing of session");
90+
handler.handle(Future.succeededFuture());
9091
}
9192

9293
@Override
@@ -114,4 +115,8 @@ public Future<Void> close() {
114115
LOG.tracef("Calling no-op close on TransactionalContextConnection it will close after the closing of session");
115116
return Future.succeededFuture();
116117
}
118+
119+
SqlConnection getDelegate() {
120+
return connection;
121+
}
117122
}

extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/customized/TransactionalContextPool.java renamed to extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/pool/TransactionalContextPool.java

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.quarkus.hibernate.reactive.runtime.customized;
1+
package io.quarkus.reactive.transaction.runtime.pool;
22

33
import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.CURRENT_CONNECTION_KEY;
44
import static io.quarkus.reactive.transaction.runtime.TransactionalInterceptorBase.TRANSACTIONAL_METHOD_KEY;
@@ -28,6 +28,9 @@ public class TransactionalContextPool implements Pool {
2828

2929
private static final Logger LOG = Logger.getLogger(TransactionalContextPool.class);
3030

31+
// Key to store the wrapped connection for reuse by multiple sessions
32+
private static final String CURRENT_CONNECTION_KEY = "reactive.transaction.currentConnection";
33+
3134
private final Pool delegate;
3235

3336
public TransactionalContextPool(Pool delegate) {
@@ -39,6 +42,13 @@ public void getConnection(Handler<AsyncResult<SqlConnection>> handler) {
3942
if (!shouldOpenTransaction()) {
4043
delegate.getConnection(handler);
4144
} else {
45+
// Check if a connection already exists in the context (from a previous session in the same transaction)
46+
TransactionalContextConnection existingConnection = getCurrentConnectionFromVertxContext();
47+
if (existingConnection != null) {
48+
LOG.tracef("Reusing existing wrapped connection from context: %s", existingConnection);
49+
handler.handle(Future.succeededFuture(existingConnection));
50+
return;
51+
}
4252
delegate.getConnection(result -> {
4353
if (result.failed()) {
4454
handler.handle(result);
@@ -47,8 +57,9 @@ public void getConnection(Handler<AsyncResult<SqlConnection>> handler) {
4757
var connection = result.result();
4858
connection.begin()
4959
.map(transaction -> {
50-
storeConnectionInVertxContext(connection);
51-
return (SqlConnection) new TransactionalContextConnection(connection);
60+
TransactionalContextConnection wrappedConnection = new TransactionalContextConnection(connection);
61+
storeConnectionInVertxContext(connection, wrappedConnection);
62+
return (SqlConnection) wrappedConnection;
5263
})
5364
.andThen(handler);
5465
});
@@ -60,20 +71,55 @@ public Future<SqlConnection> getConnection() {
6071
if (!shouldOpenTransaction()) {
6172
return delegate.getConnection();
6273
} else {
74+
// Check if a connection already exists in the context (from a previous session in the same transaction)
75+
TransactionalContextConnection existingConnection = getCurrentConnectionFromVertxContext();
76+
if (existingConnection != null) {
77+
LOG.tracef("Reusing existing wrapped connection from context: %s", existingConnection);
78+
return Future.succeededFuture(existingConnection);
79+
}
6380
return delegate.getConnection()
6481
.compose(connection -> {
6582
LOG.tracef("New connection, about to start transaction: %s", connection);
6683
return connection.begin().map(t -> {
6784
LOG.tracef("Transaction started: %s", connection);
68-
storeConnectionInVertxContext(connection);
69-
return new TransactionalContextConnection(connection);
85+
TransactionalContextConnection wrappedConnection = new TransactionalContextConnection(connection);
86+
storeConnectionInVertxContext(connection, wrappedConnection);
87+
return (SqlConnection) wrappedConnection;
7088
});
7189
});
7290
}
7391
}
7492

75-
private static void storeConnectionInVertxContext(SqlConnection connection) {
76-
Vertx.currentContext().putLocal(CURRENT_CONNECTION_KEY, connection);
93+
private static void storeConnectionInVertxContext(SqlConnection rawConnection,
94+
TransactionalContextConnection wrappedConnection) {
95+
Context context = Vertx.currentContext();
96+
// Store wrapped connection for reuse by other sessions and to retrieve delegate for closing
97+
context.putLocal(CURRENT_CONNECTION_KEY, wrappedConnection);
98+
}
99+
100+
public static TransactionalContextConnection getCurrentConnectionFromVertxContext() {
101+
Context context = Vertx.currentContext();
102+
return context != null ? context.getLocal(CURRENT_CONNECTION_KEY) : null;
103+
}
104+
105+
/**
106+
* Closes the current connection and clears it from the Vertx context.
107+
* This should be called by TransactionalInterceptorBase at the end of the transaction.
108+
*
109+
* @return a Future that completes when the connection is closed, or null if no connection exists
110+
*/
111+
public static Future<Void> closeAndClearCurrentConnection() {
112+
TransactionalContextConnection wrappedConnection = getCurrentConnectionFromVertxContext();
113+
if (wrappedConnection == null) {
114+
return null;
115+
}
116+
SqlConnection delegateConnection = wrappedConnection.getDelegate();
117+
return delegateConnection.close().andThen(ar -> {
118+
Context context = Vertx.currentContext();
119+
if (context != null) {
120+
context.removeLocal(CURRENT_CONNECTION_KEY);
121+
}
122+
});
77123
}
78124

79125
private boolean shouldOpenTransaction() {

0 commit comments

Comments
 (0)