From 0b3a9a52b59775767a6894d8988a00668bcac2e2 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 31 Jan 2025 11:13:34 +0200 Subject: [PATCH 1/6] ensures connection is close on keepalive timeout Signed-off-by: Oleh Dokuka --- .../io/rsocket/core/RSocketRequester.java | 13 +- .../io/rsocket/integration/KeepaliveTest.java | 178 ++++++++++++++++++ 2 files changed, 188 insertions(+), 3 deletions(-) create mode 100644 rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 9e8d349bf..fb2198a45 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -309,9 +309,16 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, ByteBu private void tryTerminateOnKeepAlive(KeepAliveSupport.KeepAlive keepAlive) { tryTerminate( - () -> - new ConnectionErrorException( - String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis()))); + () -> { + ConnectionErrorException exception = new ConnectionErrorException(String.format( + "No keep-alive acks for %d ms", + keepAlive.getTimeout() + .toMillis())); + + + getDuplexConnection().dispose(); + return exception; + }); } private void tryShutdown(Throwable e) { diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java new file mode 100644 index 000000000..48257146d --- /dev/null +++ b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java @@ -0,0 +1,178 @@ +package io.rsocket.integration; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.core.RSocketClient; +import io.rsocket.core.RSocketConnector; +import io.rsocket.core.RSocketServer; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty.tcp.TcpClient; +import reactor.netty.tcp.TcpServer; +import reactor.test.StepVerifier; +import reactor.util.retry.Retry; +import reactor.util.retry.RetryBackoffSpec; + +public class KeepaliveTest { + + private static final Logger LOG = LoggerFactory.getLogger(KeepaliveTest.class); + private static final int PORT = 23200; + + @Test + void keepAliveTest() { + createServer().block(); + RSocketClient rsocketClient = createClient(); + + int expectedCount = 4; + AtomicBoolean sleepOnce = new AtomicBoolean(true); + StepVerifier.create( + Flux.range(0, expectedCount) + .delayElements(Duration.ofMillis(2000)) + .concatMap(i -> + rsocketClient.requestResponse(Mono.just(DefaultPayload.create(""))) + .doOnNext(__ -> { + if (sleepOnce.getAndSet(false)) { + try { + LOG.info("Sleeping..."); + Thread.sleep(1_000); + LOG.info("Waking up."); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }) + .log("id " + i) + .onErrorComplete() + )) + .expectSubscription() + .expectNextCount(expectedCount) + .verifyComplete(); + } + + @Test + void keepAliveTestLazy() { + createServer().block(); + Mono rsocketMono = createClientLazy(); + + int expectedCount = 4; + AtomicBoolean sleepOnce = new AtomicBoolean(true); + StepVerifier.create( + Flux.range(0, expectedCount) + .delayElements(Duration.ofMillis(2000)) + .concatMap(i -> + rsocketMono.flatMap(rsocket -> rsocket.requestResponse(DefaultPayload.create("")) + .doOnNext(__ -> { + if (sleepOnce.getAndSet(false)) { + try { + LOG.info("Sleeping..."); + Thread.sleep(1_000); + LOG.info("Waking up."); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }) + .log("id " + i) + .onErrorComplete() + ) + )) + .expectSubscription() + .expectNextCount(expectedCount) + .verifyComplete(); + } + + private static Mono createServer() { + LOG.info("Starting server at port {}", PORT); + + TcpServer tcpServer = TcpServer.create().host("localhost").port(PORT); + + return RSocketServer.create((setupPayload, rSocket) -> { + rSocket.onClose() + .doFirst(() -> LOG.info("Connected on server side.")) + .doOnTerminate(() -> LOG.info("Connection closed on server side.")) + .subscribe(); + + return Mono.just(new MyServerRsocket()); + }) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .bind(TcpServerTransport.create(tcpServer)) + .doOnNext(closeableChannel -> LOG.info("RSocket server started.")); + } + + private static RSocketClient createClient() { + LOG.info("Connecting...."); + + Function reconnectSpec = reason -> Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L)) + .doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason)); + + Mono rsocketMono = RSocketConnector.create() + .fragment(16384) + .reconnect(reconnectSpec.apply("connector-close")) + .keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L)) + .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))); + + RSocketClient client = RSocketClient.from(rsocketMono); + + client + .source() + .doOnNext(r -> LOG.info("Got RSocket")) + .flatMap(RSocket::onClose) + .doOnError(err -> LOG.error("Error during onClose.", err)) + .retryWhen(reconnectSpec.apply("client-close")) + .doFirst(() -> LOG.info("Connected on client side.")) + .doOnTerminate(() -> LOG.info("Connection closed on client side.")) + .repeat() + .subscribe(); + + return client; + } + + + private static Mono createClientLazy() { + LOG.info("Connecting...."); + + Function reconnectSpec = reason -> Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L)) + .doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason)); + + return RSocketConnector.create() + .fragment(16384) + .reconnect(reconnectSpec.apply("connector-close")) + .keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L)) + .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))); + +// RSocketClient client = RSocketClient.from(rsocketMono); + +// client +// .source() +// .doOnNext(r -> LOG.info("Got RSocket")) +// .flatMap(RSocket::onClose) +// .doOnError(err -> LOG.error("Error during onClose.", err)) +// .retryWhen(reconnectSpec.apply("client-close")) +// .doFirst(() -> LOG.info("Connected on client side.")) +// .doOnTerminate(() -> LOG.info("Connection closed on client side.")) +// .repeat() +// .subscribe(); + +// return client; + } + + public static class MyServerRsocket implements RSocket { + + @Override + public Mono requestResponse(Payload payload) { + return Mono.just("Pong").map(DefaultPayload::create); + } + } +} \ No newline at end of file From 6ff8f7100496394337b2ec3d4b4ca19d21612ed5 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 31 Jan 2025 11:26:47 +0200 Subject: [PATCH 2/6] fix format Signed-off-by: Oleh Dokuka --- .../io/rsocket/core/RSocketRequester.java | 8 +- .../io/rsocket/integration/KeepaliveTest.java | 316 +++++++++--------- .../src/test/resources/logback-test.xml | 1 - 3 files changed, 166 insertions(+), 159 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index fb2198a45..26f052780 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -310,11 +310,9 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, ByteBu private void tryTerminateOnKeepAlive(KeepAliveSupport.KeepAlive keepAlive) { tryTerminate( () -> { - ConnectionErrorException exception = new ConnectionErrorException(String.format( - "No keep-alive acks for %d ms", - keepAlive.getTimeout() - .toMillis())); - + ConnectionErrorException exception = + new ConnectionErrorException( + String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis())); getDuplexConnection().dispose(); return exception; diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java index 48257146d..48d70f654 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java @@ -1,9 +1,5 @@ package io.rsocket.integration; -import java.time.Duration; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; - import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.core.RSocketClient; @@ -14,6 +10,9 @@ import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.util.DefaultPayload; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,152 +26,163 @@ public class KeepaliveTest { - private static final Logger LOG = LoggerFactory.getLogger(KeepaliveTest.class); - private static final int PORT = 23200; - - @Test - void keepAliveTest() { - createServer().block(); - RSocketClient rsocketClient = createClient(); - - int expectedCount = 4; - AtomicBoolean sleepOnce = new AtomicBoolean(true); - StepVerifier.create( - Flux.range(0, expectedCount) - .delayElements(Duration.ofMillis(2000)) - .concatMap(i -> - rsocketClient.requestResponse(Mono.just(DefaultPayload.create(""))) - .doOnNext(__ -> { - if (sleepOnce.getAndSet(false)) { - try { - LOG.info("Sleeping..."); - Thread.sleep(1_000); - LOG.info("Waking up."); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - }) - .log("id " + i) - .onErrorComplete() - )) - .expectSubscription() - .expectNextCount(expectedCount) - .verifyComplete(); - } - - @Test - void keepAliveTestLazy() { - createServer().block(); - Mono rsocketMono = createClientLazy(); - - int expectedCount = 4; - AtomicBoolean sleepOnce = new AtomicBoolean(true); - StepVerifier.create( - Flux.range(0, expectedCount) - .delayElements(Duration.ofMillis(2000)) - .concatMap(i -> - rsocketMono.flatMap(rsocket -> rsocket.requestResponse(DefaultPayload.create("")) - .doOnNext(__ -> { - if (sleepOnce.getAndSet(false)) { - try { - LOG.info("Sleeping..."); - Thread.sleep(1_000); - LOG.info("Waking up."); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - }) - .log("id " + i) - .onErrorComplete() - ) - )) - .expectSubscription() - .expectNextCount(expectedCount) - .verifyComplete(); - } - - private static Mono createServer() { - LOG.info("Starting server at port {}", PORT); - - TcpServer tcpServer = TcpServer.create().host("localhost").port(PORT); - - return RSocketServer.create((setupPayload, rSocket) -> { - rSocket.onClose() - .doFirst(() -> LOG.info("Connected on server side.")) - .doOnTerminate(() -> LOG.info("Connection closed on server side.")) - .subscribe(); - - return Mono.just(new MyServerRsocket()); - }) - .payloadDecoder(PayloadDecoder.ZERO_COPY) - .bind(TcpServerTransport.create(tcpServer)) - .doOnNext(closeableChannel -> LOG.info("RSocket server started.")); - } - - private static RSocketClient createClient() { - LOG.info("Connecting...."); - - Function reconnectSpec = reason -> Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L)) - .doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason)); - - Mono rsocketMono = RSocketConnector.create() - .fragment(16384) - .reconnect(reconnectSpec.apply("connector-close")) - .keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L)) - .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))); - - RSocketClient client = RSocketClient.from(rsocketMono); - - client - .source() - .doOnNext(r -> LOG.info("Got RSocket")) - .flatMap(RSocket::onClose) - .doOnError(err -> LOG.error("Error during onClose.", err)) - .retryWhen(reconnectSpec.apply("client-close")) - .doFirst(() -> LOG.info("Connected on client side.")) - .doOnTerminate(() -> LOG.info("Connection closed on client side.")) - .repeat() - .subscribe(); - - return client; - } - - - private static Mono createClientLazy() { - LOG.info("Connecting...."); - - Function reconnectSpec = reason -> Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L)) - .doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason)); - - return RSocketConnector.create() - .fragment(16384) - .reconnect(reconnectSpec.apply("connector-close")) - .keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L)) - .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))); - -// RSocketClient client = RSocketClient.from(rsocketMono); - -// client -// .source() -// .doOnNext(r -> LOG.info("Got RSocket")) -// .flatMap(RSocket::onClose) -// .doOnError(err -> LOG.error("Error during onClose.", err)) -// .retryWhen(reconnectSpec.apply("client-close")) -// .doFirst(() -> LOG.info("Connected on client side.")) -// .doOnTerminate(() -> LOG.info("Connection closed on client side.")) -// .repeat() -// .subscribe(); - -// return client; - } - - public static class MyServerRsocket implements RSocket { - - @Override - public Mono requestResponse(Payload payload) { - return Mono.just("Pong").map(DefaultPayload::create); - } - } -} \ No newline at end of file + private static final Logger LOG = LoggerFactory.getLogger(KeepaliveTest.class); + private static final int PORT = 23200; + + @Test + void keepAliveTest() { + createServer().block(); + RSocketClient rsocketClient = createClient(); + + int expectedCount = 4; + AtomicBoolean sleepOnce = new AtomicBoolean(true); + StepVerifier.create( + Flux.range(0, expectedCount) + .delayElements(Duration.ofMillis(2000)) + .concatMap( + i -> + rsocketClient + .requestResponse(Mono.just(DefaultPayload.create(""))) + .doOnNext( + __ -> { + if (sleepOnce.getAndSet(false)) { + try { + LOG.info("Sleeping..."); + Thread.sleep(1_000); + LOG.info("Waking up."); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }) + .log("id " + i) + .onErrorComplete())) + .expectSubscription() + .expectNextCount(expectedCount) + .verifyComplete(); + } + + @Test + void keepAliveTestLazy() { + createServer().block(); + Mono rsocketMono = createClientLazy(); + + int expectedCount = 4; + AtomicBoolean sleepOnce = new AtomicBoolean(true); + StepVerifier.create( + Flux.range(0, expectedCount) + .delayElements(Duration.ofMillis(2000)) + .concatMap( + i -> + rsocketMono.flatMap( + rsocket -> + rsocket + .requestResponse(DefaultPayload.create("")) + .doOnNext( + __ -> { + if (sleepOnce.getAndSet(false)) { + try { + LOG.info("Sleeping..."); + Thread.sleep(1_000); + LOG.info("Waking up."); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }) + .log("id " + i) + .onErrorComplete()))) + .expectSubscription() + .expectNextCount(expectedCount) + .verifyComplete(); + } + + private static Mono createServer() { + LOG.info("Starting server at port {}", PORT); + + TcpServer tcpServer = TcpServer.create().host("localhost").port(PORT); + + return RSocketServer.create( + (setupPayload, rSocket) -> { + rSocket + .onClose() + .doFirst(() -> LOG.info("Connected on server side.")) + .doOnTerminate(() -> LOG.info("Connection closed on server side.")) + .subscribe(); + + return Mono.just(new MyServerRsocket()); + }) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .bind(TcpServerTransport.create(tcpServer)) + .doOnNext(closeableChannel -> LOG.info("RSocket server started.")); + } + + private static RSocketClient createClient() { + LOG.info("Connecting...."); + + Function reconnectSpec = + reason -> + Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L)) + .doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason)); + + Mono rsocketMono = + RSocketConnector.create() + .fragment(16384) + .reconnect(reconnectSpec.apply("connector-close")) + .keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L)) + .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))); + + RSocketClient client = RSocketClient.from(rsocketMono); + + client + .source() + .doOnNext(r -> LOG.info("Got RSocket")) + .flatMap(RSocket::onClose) + .doOnError(err -> LOG.error("Error during onClose.", err)) + .retryWhen(reconnectSpec.apply("client-close")) + .doFirst(() -> LOG.info("Connected on client side.")) + .doOnTerminate(() -> LOG.info("Connection closed on client side.")) + .repeat() + .subscribe(); + + return client; + } + + private static Mono createClientLazy() { + LOG.info("Connecting...."); + + Function reconnectSpec = + reason -> + Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L)) + .doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason)); + + return RSocketConnector.create() + .fragment(16384) + .reconnect(reconnectSpec.apply("connector-close")) + .keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L)) + .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))); + + // RSocketClient client = RSocketClient.from(rsocketMono); + + // client + // .source() + // .doOnNext(r -> LOG.info("Got RSocket")) + // .flatMap(RSocket::onClose) + // .doOnError(err -> LOG.error("Error during onClose.", err)) + // .retryWhen(reconnectSpec.apply("client-close")) + // .doFirst(() -> LOG.info("Connected on client side.")) + // .doOnTerminate(() -> LOG.info("Connection closed on client side.")) + // .repeat() + // .subscribe(); + + // return client; + } + + public static class MyServerRsocket implements RSocket { + + @Override + public Mono requestResponse(Payload payload) { + return Mono.just("Pong").map(DefaultPayload::create); + } + } +} diff --git a/rsocket-transport-netty/src/test/resources/logback-test.xml b/rsocket-transport-netty/src/test/resources/logback-test.xml index b42db6df6..981d6d0b6 100644 --- a/rsocket-transport-netty/src/test/resources/logback-test.xml +++ b/rsocket-transport-netty/src/test/resources/logback-test.xml @@ -27,7 +27,6 @@ - From 1501c767c0512b88f273fe71c895bfd33734a9d7 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 31 Jan 2025 12:28:14 +0200 Subject: [PATCH 3/6] improve KeepaliveTest Signed-off-by: Oleh Dokuka --- .../io/rsocket/integration/KeepaliveTest.java | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java index 48d70f654..b2db0c04f 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java @@ -13,6 +13,9 @@ import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,9 +32,21 @@ public class KeepaliveTest { private static final Logger LOG = LoggerFactory.getLogger(KeepaliveTest.class); private static final int PORT = 23200; + private CloseableChannel server; + + @BeforeEach + void setUp() { + server = createServer().block(); + } + + @AfterEach + void tearDown() { + server.dispose(); + server.onClose().block(); + } + @Test void keepAliveTest() { - createServer().block(); RSocketClient rsocketClient = createClient(); int expectedCount = 4; @@ -64,7 +79,6 @@ void keepAliveTest() { @Test void keepAliveTestLazy() { - createServer().block(); Mono rsocketMono = createClientLazy(); int expectedCount = 4; @@ -161,21 +175,6 @@ private static Mono createClientLazy() { .reconnect(reconnectSpec.apply("connector-close")) .keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L)) .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))); - - // RSocketClient client = RSocketClient.from(rsocketMono); - - // client - // .source() - // .doOnNext(r -> LOG.info("Got RSocket")) - // .flatMap(RSocket::onClose) - // .doOnError(err -> LOG.error("Error during onClose.", err)) - // .retryWhen(reconnectSpec.apply("client-close")) - // .doFirst(() -> LOG.info("Connected on client side.")) - // .doOnTerminate(() -> LOG.info("Connection closed on client side.")) - // .repeat() - // .subscribe(); - - // return client; } public static class MyServerRsocket implements RSocket { From ff740b38df944d10fc370d19831beb5818abc612 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 31 Jan 2025 12:31:45 +0200 Subject: [PATCH 4/6] fix format and failing test Signed-off-by: Oleh Dokuka --- .../main/java/io/rsocket/core/RSocketRequester.java | 12 ++++-------- .../java/io/rsocket/integration/KeepaliveTest.java | 1 - 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 26f052780..b8a9c00ff 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -309,14 +309,10 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, ByteBu private void tryTerminateOnKeepAlive(KeepAliveSupport.KeepAlive keepAlive) { tryTerminate( - () -> { - ConnectionErrorException exception = - new ConnectionErrorException( - String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis())); - - getDuplexConnection().dispose(); - return exception; - }); + () -> + new ConnectionErrorException( + String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis()))); + getDuplexConnection().dispose(); } private void tryShutdown(Throwable e) { diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java index b2db0c04f..cfa29aa7c 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java @@ -13,7 +13,6 @@ import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; From a8dd315d0a2ad167df7e41e6d41397355c46ba8e Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 31 Jan 2025 12:34:24 +0200 Subject: [PATCH 5/6] adds reference to the original GH issue Signed-off-by: Oleh Dokuka --- .../src/test/java/io/rsocket/integration/KeepaliveTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java index cfa29aa7c..c2f2d874c 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java @@ -26,6 +26,10 @@ import reactor.util.retry.Retry; import reactor.util.retry.RetryBackoffSpec; +/** + * Test case that reproduces the following + * GitHub Issue + */ public class KeepaliveTest { private static final Logger LOG = LoggerFactory.getLogger(KeepaliveTest.class); From b30704bcb307b2a44b6bd97c6e24e4f47f7faea0 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 31 Jan 2025 13:31:14 +0200 Subject: [PATCH 6/6] fixes google format Signed-off-by: Oleh Dokuka --- .../src/test/java/io/rsocket/integration/KeepaliveTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java index c2f2d874c..f05713215 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/integration/KeepaliveTest.java @@ -27,8 +27,8 @@ import reactor.util.retry.RetryBackoffSpec; /** - * Test case that reproduces the following - * GitHub Issue + * Test case that reproduces the following GitHub Issue */ public class KeepaliveTest {