Open
Description
Greetings!
Please consider the test below (reactor + r2dbc-postgresql + r2dbc-pool). A reactive chain within the test begins a transaction (outer), inserts a record, begins an autonomous transaction (inner). Inner attempts to insert a record with the same data thus being unable to proceed (outer is still in progress) is cancelled by the reactive chain timeout operator:
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.RepeatedTest
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.r2dbc.core.DatabaseClient
import org.springframework.transaction.ReactiveTransactionManager
import org.springframework.transaction.TransactionDefinition
import org.springframework.transaction.reactive.TransactionalOperator
import org.springframework.transaction.support.DefaultTransactionDefinition
import reactor.core.publisher.Mono
import java.time.Duration
import java.util.*
import java.util.concurrent.TimeoutException
class TimeoutsTest : R2dbcRepositoryTestBase() {
@Autowired
private lateinit var databaseClient: DatabaseClient
@Autowired
private lateinit var transactionManager: ReactiveTransactionManager
//@Test
@RepeatedTest(value = 30, failureThreshold = 1)
fun testTimeout() {
// Jdbc is used for the sake of DB initialization only .
SERVER.getJdbcOperations()
?.execute(
"""
DROP TABLE IF EXISTS users_TimeoutsTest;
CREATE TABLE IF NOT EXISTS users_TimeoutsTest (
userId SERIAL PRIMARY KEY,
userName VARCHAR NOT NULL,
email VARCHAR NOT NULL,
CONSTRAINT unique_user_TimeoutsTest UNIQUE (userName),
CONSTRAINT unique_email_TimeoutsTest UNIQUE (email)
);
""".trimIndent()
)
val outerTransactionalOperator = TransactionalOperator.create(
transactionManager,
DefaultTransactionDefinition() .apply {
isolationLevel = TransactionDefinition.ISOLATION_READ_COMMITTED
}
)
val result: Mono<String> =
databaseClient.sql("INSERT INTO users_TimeoutsTest (userId, userName, email) VALUES (1000, 'test', '[email protected]') RETURNING userId")
.map { row, _ -> row.get(0).toString() }
.one()
.flatMap {
val innerTransactionalOperator = TransactionalOperator.create(
transactionManager,
DefaultTransactionDefinition() .apply {
isolationLevel = TransactionDefinition.ISOLATION_REPEATABLE_READ
propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRES_NEW
}
)
databaseClient.sql("INSERT INTO users_TimeoutsTest (userId, userName, email) VALUES (1001, 'test', '[email protected]') RETURNING userId")
.map { row, _ -> row.get(0).toString() }
.one()
.`as`(innerTransactionalOperator::transactional)
.timeout(Duration.ofSeconds(5))
.onErrorResume(TimeoutException::class.java) { Mono.just("Inner failure") }
}
.`as`(outerTransactionalOperator::transactional)
.onErrorResume {
databaseClient.sql("SELECT COUNT(*) FROM users_TimeoutsTest WHERE userId=1000")
.map { row, _ -> row.get(0) }
.one()
.map { if (it == 1) "Inner failure" else "Outer failure" }
}
assertEquals("Inner failure", result.block())
}
}
Environment:
- springframework: 6.1.2
- reactor-core: 3.6.1
- r2dbc-pool: 1.0.1
- r2dbc-postgresql: 1.0.3
The outcomes:
- Always see the following stacktrace:
java.lang.IllegalStateException: No value for key [ConnectionPool[PostgreSQL]] bound to context
at org.springframework.transaction.reactive.TransactionSynchronizationManager.unbindResource(TransactionSynchronizationManager.java:151) ~[spring-tx-6.1.2.jar:6.1.2]
at org.springframework.r2dbc.connection.R2dbcTransactionManager.lambda$doCleanupAfterCompletion$14(R2dbcTransactionManager.java:350) ~[spring-r2dbc-6.1.2.jar:6.1.2]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:45) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.Mono.subscribe(Mono.java:4512) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:279) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:49) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89) ~[reactor-core-3.6.1.jar:3.6.1]
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223) ~[reactor-core-3.6.1.jar:3.6.1]
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:465) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:871) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:819) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:249) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:215) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:206) ~[reactor-core-3.6.1.jar:3.6.1]
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:668) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:934) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:810) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:716) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294) ~[reactor-netty-core-1.1.14.jar:1.1.14]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403) ~[reactor-netty-core-1.1.14.jar:1.1.14]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426) ~[reactor-netty-core-1.1.14.jar:1.1.14]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.14.jar:1.1.14]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
- the exchange with the database is the following :
372 3.019813 127.0.0.1 56603 127.0.0.1 32799 PGSQL 110 >Q ----------> BEGIN ISOLATION LEVEL READ COMMITTED, READ WRITE
373 3.019838 127.0.0.1 32799 127.0.0.1 56603 TCP 56 32799 → 56603 [ACK] Seq=1 Ack=55 Win=6373 Len=0 TSval=3610326549 TSecr=3240910966
386 3.034130 127.0.0.1 32799 127.0.0.1 56603 PGSQL 73 <C/Z
387 3.034157 127.0.0.1 56603 127.0.0.1 32799 TCP 56 56603 → 32799 [ACK] Seq=55 Ack=18 Win=6370 Len=0 TSval=3240910980 TSecr=3610326563
434 3.038892 127.0.0.1 56603 127.0.0.1 32799 PGSQL 177 >Q ----------> INSERT INTO users_TimeoutsTest (userId, userName, email) VALUES (1000, 'test', '[email protected]') RETURNING userId
435 3.038927 127.0.0.1 32799 127.0.0.1 56603 TCP 56 32799 → 56603 [ACK] Seq=18 Ack=176 Win=6372 Len=0 TSval=3610326568 TSecr=3240910985
460 3.064530 127.0.0.1 32799 127.0.0.1 56603 PGSQL 125 <T/D/C/Z
461 3.064544 127.0.0.1 56603 127.0.0.1 32799 TCP 56 56603 → 32799 [ACK] Seq=176 Ack=87 Win=6369 Len=0 TSval=3240911010 TSecr=3610326593
711 3.093204 127.0.0.1 56601 127.0.0.1 32799 PGSQL 111 >Q ----------> BEGIN ISOLATION LEVEL REPEATABLE READ, READ WRITE (another connection)
712 3.093236 127.0.0.1 32799 127.0.0.1 56601 TCP 56 32799 → 56601 [ACK] Seq=1 Ack=56 Win=6373 Len=0 TSval=200857867 TSecr=298528505
729 3.104023 127.0.0.1 32799 127.0.0.1 56601 PGSQL 73 <C/Z
730 3.104031 127.0.0.1 56601 127.0.0.1 32799 TCP 56 56601 → 32799 [ACK] Seq=56 Ack=18 Win=6370 Len=0 TSval=298528516 TSecr=200857878
731 *REF* 127.0.0.1 56601 127.0.0.1 32799 PGSQL 177 >Q ----------> INSERT INTO users_TimeoutsTest (userId, userName, email) VALUES (1001, 'test', '[email protected]') RETURNING userId
732 0.000010 127.0.0.1 32799 127.0.0.1 56601 TCP 56 32799 → 56601 [ACK] Seq=18 Ack=177 Win=6372 Len=0 TSval=200857878 TSecr=298528516
1155 5.006252 127.0.0.1 56601 127.0.0.1 32799 PGSQL 70 >Q ----------> ROLLBACK (inner)
1156 5.006272 127.0.0.1 32799 127.0.0.1 56601 TCP 56 32799 → 56601 [ACK] Seq=18 Ack=191 Win=6371 Len=0 TSval=200862884 TSecr=298533522
1179 5.017974 127.0.0.1 56603 127.0.0.1 32799 PGSQL 68 >Q ----------> COMMIT (outer)
1180 5.017995 127.0.0.1 32799 127.0.0.1 56603 TCP 56 32799 → 56603 [ACK] Seq=87 Ack=188 Win=6371 Len=0 TSval=3610331651 TSecr=3240916068
1186 5.047064 127.0.0.1 32799 127.0.0.1 56603 PGSQL 74 <C/Z
1187 5.047120 127.0.0.1 56603 127.0.0.1 32799 TCP 56 56603 → 32799 [ACK] Seq=188 Ack=105 Win=6369 Len=0 TSval=3240916098 TSecr=3610331681
1190 5.050271 127.0.0.1 32799 127.0.0.1 56601 PGSQL 289 <E/Z
1191 5.050313 127.0.0.1 56601 127.0.0.1 32799 TCP 56 56601 → 32799 [ACK] Seq=191 Ack=251 Win=6367 Len=0 TSval=298533567 TSecr=200862929
1204 5.054634 127.0.0.1 32799 127.0.0.1 56601 PGSQL 76 <C/Z
- Once in a while the test fails: the chain produces «Outer failure» (some concurrency issue?)
- The last and the worst: Should the RepeatedTest is used the test hangs after POOL_SIZE repetitions because a connection can’t be gained from the pool. The screenshot with the state of the pool below suggests that one of two connections is not returned into the pool during every test run.