Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.rsocket.resume.*;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -96,6 +98,8 @@ static class ResumableServerSetup extends ServerSetup {
private final Duration resumeStreamTimeout;
private final Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory;
private final boolean cleanupStoreOnKeepAlive;
private final Map<DuplexConnection, DuplexConnection> rawConnections =
new ConcurrentHashMap<>();

ResumableServerSetup(
Duration timeout,
Expand Down Expand Up @@ -136,6 +140,8 @@ public Mono<Void> acceptRSocketSetup(

sessionManager.save(serverRSocketSession, resumeToken);

rawConnections.put(resumableDuplexConnection, duplexConnection);

return then.apply(
new ResumableKeepAliveHandler(
resumableDuplexConnection, serverRSocketSession, serverRSocketSession),
Expand All @@ -145,6 +151,21 @@ public Mono<Void> acceptRSocketSetup(
}
}

@Override
void sendError(DuplexConnection duplexConnection, RSocketErrorException exception) {
DuplexConnection rawConnection = rawConnections.remove(duplexConnection);
if (rawConnection != null) {
// Send the error directly on the raw connection, bypassing ResumableDuplexConnection
// which would buffer the frame instead of sending it to the client
rawConnection.sendErrorAndClose(exception);
rawConnection.receive().subscribe();
// Dispose the resumable connection to clean up the session
duplexConnection.dispose();
} else {
super.sendError(duplexConnection, exception);
}
}

@Override
public Mono<Void> acceptRSocketResume(ByteBuf frame, DuplexConnection duplexConnection) {
ServerRSocketSession session = sessionManager.get(ResumeFrameCodec.token(frame));
Expand Down
32 changes: 32 additions & 0 deletions rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,36 @@ public void ensuresErrorFrameDeliveredPriorConnectionDisposal() {
server.dispose();
transport.alloc().assertHasNoLeaks();
}

@Test
public void ensuresErrorFrameDeliveredWithResumeEnabled() {
TestServerTransport transport = new TestServerTransport();
Closeable server =
RSocketServer.create()
.resume(new Resume())
.acceptor(
(setup, sendingSocket) -> Mono.error(new RejectedSetupException("ACCESS_DENIED")))
.bind(transport)
.block();

TestDuplexConnection connection = transport.connect();
connection.addToReceivedBuffer(
SetupFrameCodec.encode(
ByteBufAllocator.DEFAULT,
false,
0,
1,
Unpooled.wrappedBuffer("test-resume-token".getBytes()),
"metadata_type",
"data_type",
EmptyPayload.INSTANCE));

StepVerifier.create(connection.onClose()).expectComplete().verify(Duration.ofSeconds(30));
FrameAssert.assertThat(connection.pollFrame())
.hasStreamIdZero()
.hasData("ACCESS_DENIED")
.hasNoLeaks();
server.dispose();
transport.alloc().assertHasNoLeaks();
}
}