Skip to content

Commit 46b8cc3

Browse files
wdrosteclaude
andcommitted
Fix Resume preventing error frame on RejectedSetupException (#1121)
When Resume was configured, RejectedSetupException errors were queued in the ResumableDuplexConnection's buffer instead of being sent directly to the client. The error frame now bypasses resume buffering during setup rejection, ensuring clients receive the rejection and connections close properly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fc64b43 commit 46b8cc3

File tree

2 files changed

+53
-0
lines changed

2 files changed

+53
-0
lines changed

rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import io.rsocket.resume.*;
3030
import java.nio.channels.ClosedChannelException;
3131
import java.time.Duration;
32+
import java.util.Map;
33+
import java.util.concurrent.ConcurrentHashMap;
3234
import java.util.function.BiFunction;
3335
import java.util.function.Function;
3436
import reactor.core.publisher.Mono;
@@ -96,6 +98,8 @@ static class ResumableServerSetup extends ServerSetup {
9698
private final Duration resumeStreamTimeout;
9799
private final Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory;
98100
private final boolean cleanupStoreOnKeepAlive;
101+
private final Map<DuplexConnection, DuplexConnection> rawConnections =
102+
new ConcurrentHashMap<>();
99103

100104
ResumableServerSetup(
101105
Duration timeout,
@@ -136,6 +140,8 @@ public Mono<Void> acceptRSocketSetup(
136140

137141
sessionManager.save(serverRSocketSession, resumeToken);
138142

143+
rawConnections.put(resumableDuplexConnection, duplexConnection);
144+
139145
return then.apply(
140146
new ResumableKeepAliveHandler(
141147
resumableDuplexConnection, serverRSocketSession, serverRSocketSession),
@@ -145,6 +151,21 @@ public Mono<Void> acceptRSocketSetup(
145151
}
146152
}
147153

154+
@Override
155+
void sendError(DuplexConnection duplexConnection, RSocketErrorException exception) {
156+
DuplexConnection rawConnection = rawConnections.remove(duplexConnection);
157+
if (rawConnection != null) {
158+
// Send the error directly on the raw connection, bypassing ResumableDuplexConnection
159+
// which would buffer the frame instead of sending it to the client
160+
rawConnection.sendErrorAndClose(exception);
161+
rawConnection.receive().subscribe();
162+
// Dispose the resumable connection to clean up the session
163+
duplexConnection.dispose();
164+
} else {
165+
super.sendError(duplexConnection, exception);
166+
}
167+
}
168+
148169
@Override
149170
public Mono<Void> acceptRSocketResume(ByteBuf frame, DuplexConnection duplexConnection) {
150171
ServerRSocketSession session = sessionManager.get(ResumeFrameCodec.token(frame));

rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,36 @@ public void ensuresErrorFrameDeliveredPriorConnectionDisposal() {
198198
server.dispose();
199199
transport.alloc().assertHasNoLeaks();
200200
}
201+
202+
@Test
203+
public void ensuresErrorFrameDeliveredWithResumeEnabled() {
204+
TestServerTransport transport = new TestServerTransport();
205+
Closeable server =
206+
RSocketServer.create()
207+
.resume(new Resume())
208+
.acceptor(
209+
(setup, sendingSocket) -> Mono.error(new RejectedSetupException("ACCESS_DENIED")))
210+
.bind(transport)
211+
.block();
212+
213+
TestDuplexConnection connection = transport.connect();
214+
connection.addToReceivedBuffer(
215+
SetupFrameCodec.encode(
216+
ByteBufAllocator.DEFAULT,
217+
false,
218+
0,
219+
1,
220+
Unpooled.wrappedBuffer("test-resume-token".getBytes()),
221+
"metadata_type",
222+
"data_type",
223+
EmptyPayload.INSTANCE));
224+
225+
StepVerifier.create(connection.onClose()).expectComplete().verify(Duration.ofSeconds(30));
226+
FrameAssert.assertThat(connection.pollFrame())
227+
.hasStreamIdZero()
228+
.hasData("ACCESS_DENIED")
229+
.hasNoLeaks();
230+
server.dispose();
231+
transport.alloc().assertHasNoLeaks();
232+
}
201233
}

0 commit comments

Comments
 (0)