Skip to content

Commit 2cb605d

Browse files
committed
Allow mixing Mutiny.Session and Mutiny.StatelessSession in same transaction
This change removes the artificial restriction that prevented using both Mutiny.Session and Mutiny.StatelessSession within the same @transactional method in Hibernate Reactive, bringing it in line with blocking Hibernate ORM. Key changes: - Added thread-safe connection reuse in TransactionalContextPool: when a second session type requests a connection within the same transaction, it reuses the existing connection from the Vert.x context. Uses VarHandle with compareAndSet to prevent race conditions when both session types request connections concurrently. - Extracted ConnectionHolder as a package-protected class that ensures only one connection is created per transaction even with concurrent access, using a Promise pattern inspired by Hibernate Reactive's ProxyConnection - Made TransactionalContextConnection.close() a no-op to prevent premature connection closure, letting only TransactionalInterceptorBase close the actual connection at transaction end - Changed TransactionalContextPool.getCurrentConnectionFromVertxContext() to return a Future instead of the connection directly, allowing callers to handle async connection availability - Updated TransactionalInterceptorBase.connectionFromContext() to return Uni<SqlConnection> and all its callers to handle null checks before converting to Uni - Updated TransactionalInterceptorBase to use TransactionalContextPool.closeAndClearCurrentConnection() to properly close the connection and clear it from the Vert.x context - Removed validation checks in HibernateReactiveRecorder that threw IllegalStateException when mixing session types - Transformed MixStatelessStatefulSessionTest from negative (expecting failures) to positive tests verifying both session types work together, share connections, and commit/rollback together - Updated documentation to explain that mixing is now supported and both session types share the same underlying connection Both session types can now coexist in the same transaction, using the appropriate session type for different operations (e.g., StatelessSession for bulk operations, regular Session for operations requiring caching). Fixes #52828 Assisted-By: Claude Code <noreply@anthropic.com>
1 parent bbdf6a2 commit 2cb605d

File tree

8 files changed

+265
-146
lines changed

8 files changed

+265
-146
lines changed

docs/src/main/asciidoc/hibernate-reactive.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ Mixing both transaction management styles in the same reactive pipeline is not s
492492
In the future, we'll deprecate the previous annotations provided by Panache and and support only `@Transactional`.
493493

494494
You can inject either `Mutiny.Session` or `Mutiny.StatelessSession`.
495-
Be careful of injecting both session types in the same bean. Attempting to use both session types within the same transaction will throw an `IllegalStateException`.
495+
Mixing both session types in the same transaction should work, but should be reserved for exotic use cases implemented by advanced users, as the resulting behavior can be subtle.
496496

497497
[[transactional-different-pipelines]]
498498
=== Using Declarative Transaction Management in different reactive pipelines

extensions/hibernate-reactive/deployment/src/test/java/io/quarkus/hibernate/reactive/transaction/mixing/MixStatelessStatefulSessionTest.java

Lines changed: 81 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,56 +14,118 @@
1414
import io.quarkus.test.vertx.RunOnVertxContext;
1515
import io.quarkus.test.vertx.UniAsserter;
1616
import io.smallrye.mutiny.Uni;
17+
import io.vertx.mutiny.sqlclient.Pool;
1718

19+
/**
20+
* Verifies that Hibernate Reactive allows mixing regular Session and StatelessSession
21+
* within the same transaction.
22+
*/
1823
public class MixStatelessStatefulSessionTest {
1924

2025
@RegisterExtension
2126
static final QuarkusUnitTest config = new QuarkusUnitTest()
2227
.withApplicationRoot(jar -> jar.addClasses(Hero.class))
23-
.withConfigurationResource("application.properties");
28+
.withConfigurationResource("application-reactive-transaction.properties");
2429

2530
@Inject
2631
Mutiny.Session session;
2732

2833
@Inject
2934
Mutiny.StatelessSession statelessSession;
3035

36+
@Inject
37+
Pool pool;
38+
39+
@Test
40+
@RunOnVertxContext
41+
public void testRegularSessionThenStatelessSessionInTransactional(UniAsserter asserter) {
42+
// Use regular Session, then StatelessSession in same transaction - should work without error
43+
asserter.execute(() -> assertThat(pool.size()).isEqualTo(1));
44+
asserter.assertThat(
45+
() -> transactionalMethodUsingRegularSessionThenStatelessSession(),
46+
count -> assertThat(count).isNotNull());
47+
// Verify pool size is still 1 (connection was shared and released)
48+
asserter.execute(() -> assertThat(pool.size()).isEqualTo(1));
49+
}
50+
3151
@Test
3252
@RunOnVertxContext
3353
public void testStatelessSessionThenRegularSessionInTransactional(UniAsserter asserter) {
34-
asserter.assertFailedWith(
54+
// Use StatelessSession, then regular Session in same transaction - should work without error
55+
asserter.execute(() -> assertThat(pool.size()).isEqualTo(1));
56+
asserter.assertThat(
3557
() -> transactionalMethodUsingStatelessSessionThenRegularSession(),
36-
e -> assertThat(e)
37-
.isInstanceOf(IllegalStateException.class)
38-
.hasMessageContaining("stateless session for the same Persistence Unit is already opened")
39-
.hasMessageContaining(
40-
"Mixing different kinds of sessions in the same transaction is not supported yet"));
58+
count -> assertThat(count).isNotNull());
59+
// Verify pool size is still 1 (connection was shared and released)
60+
asserter.execute(() -> assertThat(pool.size()).isEqualTo(1));
4161
}
4262

4363
@Test
4464
@RunOnVertxContext
45-
public void testRegularSessionThenStatelessSessionInTransactional(UniAsserter asserter) {
65+
public void testBothSessionsCommitTogether(UniAsserter asserter) {
66+
// Make changes via both sessions, verify both commit together
67+
asserter.assertThat(
68+
() -> transactionalMethodCreatingViaBothSessions("CommitHero1", "CommitHero2"),
69+
v -> assertThat(v).isNull());
70+
71+
// Verify both were persisted
72+
asserter.assertThat(
73+
() -> transactionalMethodCountingHeroes("CommitHero%"),
74+
count -> assertThat(count).isEqualTo(2L));
75+
}
76+
77+
@Test
78+
@RunOnVertxContext
79+
public void testBothSessionsRollbackTogether(UniAsserter asserter) {
80+
// Make changes via both sessions, then throw exception - verify both rolled back
4681
asserter.assertFailedWith(
47-
() -> transactionalMethodUsingRegularSessionThenStatelessSession(),
82+
() -> transactionalMethodCreatingViaBothSessionsThenFailing("RollbackHero1", "RollbackHero2"),
4883
e -> assertThat(e)
49-
.isInstanceOf(IllegalStateException.class)
50-
.hasMessageContaining("session for the same Persistence Unit is already opened")
51-
.hasMessageContaining(
52-
"Mixing different kinds of sessions in the same transaction is not supported yet"));
84+
.isInstanceOf(RuntimeException.class)
85+
.hasMessage("Force rollback"));
86+
87+
// Verify neither was persisted
88+
asserter.assertThat(
89+
() -> transactionalMethodCountingHeroes("RollbackHero%"),
90+
count -> assertThat(count).isEqualTo(0L));
5391
}
5492

5593
@Transactional
56-
public Uni<Object> transactionalMethodUsingRegularSessionThenStatelessSession() {
57-
return session.createQuery("select count(1) from Hero h", Long.class).getSingleResult()
58-
.chain(count -> statelessSession.createQuery("select count(1) from Hero h", Long.class)
94+
public Uni<Long> transactionalMethodUsingRegularSessionThenStatelessSession() {
95+
return session.createSelectionQuery("select count(h) from Hero h", Long.class).getSingleResult()
96+
.chain(count -> statelessSession.createSelectionQuery("select count(h) from Hero h", Long.class)
5997
.getSingleResult());
6098
}
6199

62100
@Transactional
63-
public Uni<Object> transactionalMethodUsingStatelessSessionThenRegularSession() {
64-
return statelessSession.createQuery("select count(1) from Hero h", Long.class).getSingleResult()
65-
.chain(count -> session.createQuery("select count(1) from Hero h", Long.class)
101+
public Uni<Long> transactionalMethodUsingStatelessSessionThenRegularSession() {
102+
return statelessSession.createSelectionQuery("select count(h) from Hero h", Long.class).getSingleResult()
103+
.chain(count -> session.createSelectionQuery("select count(h) from Hero h", Long.class)
66104
.getSingleResult());
67105
}
68106

107+
@Transactional
108+
public Uni<Void> transactionalMethodCreatingViaBothSessions(String name1, String name2) {
109+
Hero hero1 = new Hero(name1);
110+
return session.persist(hero1)
111+
.call(() -> session.flush())
112+
.chain(() -> {
113+
Hero hero2 = new Hero(name2);
114+
return statelessSession.insert(hero2);
115+
});
116+
}
117+
118+
@Transactional
119+
public Uni<Void> transactionalMethodCreatingViaBothSessionsThenFailing(String name1, String name2) {
120+
return transactionalMethodCreatingViaBothSessions(name1, name2)
121+
.chain(() -> Uni.createFrom().failure(new RuntimeException("Force rollback")));
122+
}
123+
124+
@Transactional
125+
public Uni<Long> transactionalMethodCountingHeroes(String namePattern) {
126+
return session.createSelectionQuery("select count(h) from Hero h where h.name like :name", Long.class)
127+
.setParameter("name", namePattern)
128+
.getSingleResult();
129+
}
130+
69131
}

extensions/hibernate-reactive/deployment/src/test/resources/application-reactive-transaction.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
quarkus.datasource.db-kind=postgresql
22
quarkus.datasource.reactive=true
3+
# Using max-size=1 ensures that:
4+
# 1. Connection management is tested properly (connection must be released after transaction)
5+
# 2. Session types sharing the same connection is verified (tests would hang if separate connections were used)
36
quarkus.datasource.reactive.max-size=1
47

58
quarkus.hibernate-orm.log.sql=true

extensions/hibernate-reactive/runtime/src/main/java/io/quarkus/hibernate/reactive/runtime/HibernateReactiveRecorder.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -134,17 +134,6 @@ public static Mutiny.Session getSession(String persistenceUnitName) {
134134
} else if (context.getLocal(TRANSACTIONAL_METHOD_KEY) == null) {
135135
throw new IllegalStateException(noSessionFoundErrorMessage());
136136
} else {
137-
138-
Optional<OpenedSessionsState.SessionWithKey<Mutiny.StatelessSession>> openedStatelessSession = OPENED_SESSIONS_STATE_STATELESS
139-
.getOpenedSession(
140-
context,
141-
persistenceUnitName);
142-
143-
if (openedStatelessSession.isPresent()) {
144-
throw new IllegalStateException("A stateless session for the same Persistence Unit is already opened."
145-
+ "\n\t- Mixing different kinds of sessions in the same transaction is not supported yet.");
146-
}
147-
148137
// Store the persistence unit name so that we can close only this session at the end of the interceptor
149138
context.putLocal(PERSISTENCE_UNIT_NAME_KEY, persistenceUnitName);
150139

@@ -179,17 +168,6 @@ public static Mutiny.StatelessSession getStatelessSession(String persistenceUnit
179168
} else if (context.getLocal(TRANSACTIONAL_METHOD_KEY) == null) {
180169
throw new IllegalStateException(noSessionFoundErrorMessage());
181170
} else {
182-
183-
Optional<OpenedSessionsState.SessionWithKey<Mutiny.Session>> openedRegularSession = OPENED_SESSIONS_STATE
184-
.getOpenedSession(
185-
context,
186-
persistenceUnitName);
187-
188-
if (openedRegularSession.isPresent()) {
189-
throw new IllegalStateException("A (non-stateless) session for the same Persistence Unit is already opened."
190-
+ "\n\t- Mixing different kinds of sessions in the same transaction is not supported yet.");
191-
}
192-
193171
// Store the persistence unit name so that we can close only this session at the end of the interceptor
194172
context.putLocal(PERSISTENCE_UNIT_NAME_KEY, persistenceUnitName);
195173

extensions/reactive-transactions/runtime/src/main/java/io/quarkus/reactive/transaction/runtime/TransactionalInterceptorBase.java

Lines changed: 78 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.jboss.logging.Logger;
1212

1313
import io.quarkus.arc.runtime.InterceptorBindings;
14-
import io.quarkus.reactive.transaction.ReactiveResource;
1514
import io.quarkus.reactive.transaction.runtime.pool.TransactionalContextPool;
1615
import io.quarkus.transaction.annotations.Rollback;
1716
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
@@ -112,12 +111,19 @@ private Uni<Void> invokeBeforeCommitAndCommit(Context context) {
112111
.onItem().invoke(() -> LOG.tracef("Flushed the session before commit/rollback"))
113112
.onItemOrFailure().call((result, exception) -> {
114113
if (exception != null) {
115-
SqlConnection connection = connectionFromContext();
116-
return actualRollback(connection.transaction(), exception).invoke(() -> {
117-
// onItemOrFailure() will still propagate the chain even with an execption
118-
// we need to rethrow it to make sure the reactive chain fails
119-
throw new RuntimeException("Transaction rolled back due to: ", exception);
120-
});
114+
Uni<SqlConnection> connectionUni = connectionFromContext();
115+
if (connectionUni == null) {
116+
LOG.tracef("Transaction doesn't exist, cannot rollback, propagating original exception");
117+
return Uni.createFrom().failure(
118+
new RuntimeException("Transaction rolled back due to: ", exception));
119+
}
120+
return connectionUni
121+
.onItem()
122+
.transformToUni(connection -> actualRollback(connection.transaction(), exception).invoke(() -> {
123+
// onItemOrFailure() will still propagate the chain even with an execption
124+
// we need to rethrow it to make sure the reactive chain fails
125+
throw new RuntimeException("Transaction rolled back due to: ", exception);
126+
}));
121127
} else {
122128
return commit();
123129
}
@@ -131,19 +137,22 @@ private Uni<?> closeConnection() {
131137
LOG.tracef("Connection doesn't exist, nothing to do here");
132138
return Uni.createFrom().nullItem();
133139
}
134-
SqlConnection connection = connectionFromContext();
135-
LOG.tracef("Closing the connection %s", connection);
136-
return toUni(closeFuture);
140+
return toUni(closeFuture)
141+
.invoke(connection -> LOG.tracef("Closing the connection %s", connection));
137142
}
138143

139-
SqlConnection connectionFromContext() {
140-
return TransactionalContextPool.getCurrentConnectionFromVertxContext();
144+
Uni<SqlConnection> connectionFromContext() {
145+
Future<? extends SqlConnection> future = TransactionalContextPool.getCurrentConnectionFromVertxContext();
146+
if (future == null) {
147+
return null;
148+
}
149+
return toUni(future);
141150
}
142151

143152
// Based on org/hibernate/reactive/pool/impl/SqlClientConnection.java:305
144153
Uni<Void> commit() {
145-
SqlConnection connection = connectionFromContext();
146-
if (connection == null || connection.transaction() == null) {
154+
Uni<SqlConnection> connectionUni = connectionFromContext();
155+
if (connectionUni == null) {
147156
// This might happen if the method is annotated with @Transactional but doesn't flush
148157
// i.e. a single persist without an explicit .flush()
149158
// We then avoid committing the transaction here, and we rely on Hibernate Reactive
@@ -152,61 +161,82 @@ Uni<Void> commit() {
152161
return Uni.createFrom().nullItem();
153162
}
154163

155-
return toUni(connection.transaction().commit())
156-
.onFailure().invoke(() -> LOG.tracef("Failed to commit transaction: %s", connection))
157-
.invoke(() -> LOG.tracef("Transaction committed: %s", connection));
164+
return connectionUni
165+
.onItem().transformToUni(connection -> {
166+
if (connection.transaction() == null) {
167+
LOG.tracef("Transaction doesn't exist, so won't commit here");
168+
return Uni.createFrom().nullItem();
169+
}
170+
return toUni(connection.transaction().commit())
171+
.onFailure().invoke(() -> LOG.tracef("Failed to commit transaction: %s", connection))
172+
.invoke(() -> LOG.tracef("Transaction committed: %s", connection));
173+
});
158174
}
159175

160-
private static <T> Uni<T> toUni(Future<T> future) {
176+
private static <T> Uni<T> toUni(Future<? extends T> future) {
161177
return Uni.createFrom()
162178
.emitter(emitter -> future.onComplete(emitter::complete, emitter::fail));
163179
}
164180

165181
Uni<Void> rollbackOnCancel() {
166-
SqlConnection connection = connectionFromContext();
167-
Transaction transaction = connection.transaction();
168-
return toUni(transaction.rollback())
169-
.onFailure().invoke(() -> LOG.tracef("Failed to rollback transaction on cancellation: %s", connection))
170-
.invoke(() -> LOG.tracef("Transaction rolled back due to cancellation: %s", transaction));
182+
Uni<SqlConnection> connectionUni = connectionFromContext();
183+
if (connectionUni == null) {
184+
LOG.tracef("Transaction doesn't exist, so won't roll back");
185+
return Uni.createFrom().nullItem();
186+
}
187+
return connectionUni.onItem().transformToUni(connection -> {
188+
Transaction transaction = connection.transaction();
189+
return toUni(transaction.rollback())
190+
.onFailure()
191+
.invoke(() -> LOG.tracef("Failed to rollback transaction on cancellation: %s", connection))
192+
.invoke(() -> LOG.tracef("Transaction rolled back due to cancellation: %s", transaction));
193+
});
171194
}
172195

173196
// Based on org/hibernate/reactive/pool/impl/SqlClientConnection.java:314
174197
Uni<Void> rollbackOrCommitBasedOnException(Context context, Transactional annotation, Throwable exception) {
175-
SqlConnection connection = connectionFromContext();
176-
177198
for (Class<?> dontRollbackOnClass : annotation.dontRollbackOn()) {
178199
if (dontRollbackOnClass.isAssignableFrom(exception.getClass())) {
179200
LOG.trace("Avoid rollback due to `dontRollbackOn` on `@Transactional` annotation, committing instead");
180201
return invokeBeforeCommitAndCommit(context);
181202
}
182203
}
183204

184-
for (Class<?> rollbackOnClass : annotation.rollbackOn()) {
185-
if (rollbackOnClass.isAssignableFrom(exception.getClass())) {
186-
LOG.tracef(
187-
"Rollback the transaction due to exception class %s included in `rollbackOn` field on `@Transactional` annotation",
188-
exception.getClass());
189-
return actualRollback(connection.transaction(), exception);
190-
}
205+
Uni<SqlConnection> connectionUni = connectionFromContext();
206+
if (connectionUni == null) {
207+
LOG.tracef("Transaction doesn't exist, so won't commit or roll back");
208+
return Uni.createFrom().nullItem();
191209
}
210+
return connectionUni
211+
.onItem().transformToUni(connection -> {
212+
for (Class<?> rollbackOnClass : annotation.rollbackOn()) {
213+
if (rollbackOnClass.isAssignableFrom(exception.getClass())) {
214+
LOG.tracef(
215+
"Rollback the transaction due to exception class %s included in `rollbackOn` field on `@Transactional` annotation",
216+
exception.getClass());
217+
return actualRollback(connection.transaction(), exception);
218+
}
219+
}
192220

193-
Rollback rollbackAnnotation = exception.getClass().getAnnotation(Rollback.class);
194-
if (rollbackAnnotation != null) {
195-
if (rollbackAnnotation.value()) {
196-
LOG.tracef("Rollback the transaction as the exception class %s is annotated with `@Rollback` annotation",
197-
exception.getClass());
198-
return actualRollback(connection.transaction(), exception);
199-
} else {
200-
LOG.tracef(
201-
"Do not rollback the transaction as the exception class %s is annotated with `@Rollback(false)` annotation",
202-
exception.getClass());
203-
return invokeBeforeCommitAndCommit(context);
204-
}
205-
}
221+
Rollback rollbackAnnotation = exception.getClass().getAnnotation(Rollback.class);
222+
if (rollbackAnnotation != null) {
223+
if (rollbackAnnotation.value()) {
224+
LOG.tracef(
225+
"Rollback the transaction as the exception class %s is annotated with `@Rollback` annotation",
226+
exception.getClass());
227+
return actualRollback(connection.transaction(), exception);
228+
} else {
229+
LOG.tracef(
230+
"Do not rollback the transaction as the exception class %s is annotated with `@Rollback(false)` annotation",
231+
exception.getClass());
232+
return invokeBeforeCommitAndCommit(context);
233+
}
234+
}
206235

207-
// Default behavior: rollback for RuntimeException and Error (unchecked exceptions)
208-
// Note: Mutiny wraps checked exceptions in CompletionException, so they appear as RuntimeException here
209-
return actualRollback(connection.transaction(), exception);
236+
// Default behavior: rollback for RuntimeException and Error (unchecked exceptions)
237+
// Note: Mutiny wraps checked exceptions in CompletionException, so they appear as RuntimeException here
238+
return actualRollback(connection.transaction(), exception);
239+
});
210240
}
211241

212242
private Uni<Void> actualRollback(Transaction transaction, Throwable exception) {

0 commit comments

Comments
 (0)