Skip to content

Commit 4350abd

Browse files
committed
Fix NetSocket (and QuicStream) where the end signal is notified before all messages have been dispatched.
Motivation: The NetSocket implementation directly write the end sentinel message to the pending message queue, ignoging the potentially buffered messages in the connection. Changes: When we consider the message stream ends, we should instead model the last message as a connection message to ensure no reorder happens.
1 parent 361ee4d commit 4350abd

File tree

6 files changed

+73
-6
lines changed

6 files changed

+73
-6
lines changed

vertx-core/src/main/java/io/vertx/core/net/impl/SocketBase.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,14 +270,17 @@ protected void handleShutdown(Duration timeout, ChannelPromise promise) {
270270
}
271271
}
272272

273-
protected void handleEnd() {
274-
pending.write(InboundBuffer.END_SENTINEL);
273+
protected void handleEnded() {
274+
read(InboundBuffer.END_SENTINEL);
275+
endRead();
275276
}
276277

277278
@Override
278279
protected void handleMessage(Object msg) {
279280
MessageHandler handler = messageHandler();
280-
if (handler.accept(msg)) {
281+
if (msg == InboundBuffer.END_SENTINEL) {
282+
pending.write(InboundBuffer.END_SENTINEL);
283+
} else if (handler.accept(msg)) {
281284
pending.write(handler.transform(msg));
282285
} else {
283286
if (msg instanceof ReferenceCounted) {

vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,10 @@ final void read(Object msg) {
308308
}
309309
}
310310

311+
final void endRead() {
312+
read = false;
313+
}
314+
311315
private void addPending(Object msg) {
312316
if (pending == null) {
313317
pending = new ArrayDeque<>();

vertx-core/src/main/java/io/vertx/core/net/impl/quic/QuicStreamImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
*/
1111
package io.vertx.core.net.impl.quic;
1212

13-
import io.netty.buffer.Unpooled;
1413
import io.netty.channel.ChannelFuture;
1514
import io.netty.channel.ChannelHandlerContext;
1615
import io.netty.channel.socket.ChannelInputShutdownEvent;
@@ -149,7 +148,7 @@ protected void handleIdle(IdleStateEvent event) {
149148
@Override
150149
protected void handleEvent(Object event) {
151150
if (event == ChannelInputShutdownEvent.INSTANCE) {
152-
handleEnd();
151+
handleEnded();
153152
} else {
154153
super.handleEvent(event);
155154
}

vertx-core/src/main/java/io/vertx/core/net/impl/tcp/NetSocketImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public String applicationLayerProtocol() {
180180

181181
@Override
182182
protected void handleClosed() {
183-
handleEnd();
183+
handleEnded();
184184
super.handleClosed();
185185
}
186186
}

vertx-core/src/test/java/io/vertx/tests/net/NetTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,37 @@ protected void tearDown() throws Exception {
132132
super.tearDown();
133133
}
134134

135+
@Test
136+
public void testEndHandlerCalledAfterAllEmissions() {
137+
Buffer buffer = TestUtils.randomBuffer(1024 * 1024);
138+
server = vertx.createNetServer().connectHandler(so -> {
139+
so.end(buffer);
140+
so.close();
141+
});
142+
server.listen(1234).await();
143+
NetClient client = vertx.createNetClient();
144+
AtomicInteger received = new AtomicInteger();
145+
AtomicInteger ended = new AtomicInteger();
146+
client.connect(1234, "localhost").onComplete(ar -> {
147+
if (ar.succeeded()) {
148+
NetSocket socket = ar.result();
149+
socket.handler(buf -> {
150+
int amount = received.addAndGet(buf.length());
151+
assertEquals(0, ended.get());
152+
socket.pause();
153+
vertx.setTimer(50, t -> {
154+
socket.resume();
155+
});
156+
});
157+
socket.endHandler(v -> {
158+
assertEquals(0, ended.getAndIncrement());
159+
});
160+
}
161+
});
162+
assertWaitUntil(() -> received.get() == buffer.length());
163+
assertWaitUntil(() -> ended.get() > 0);
164+
}
165+
135166
@Test
136167
public void testClientOptions() {
137168
NetClientOptions options = new NetClientOptions();

vertx-core/src/test/java/io/vertx/tests/net/quic/QuicClientTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,4 +432,34 @@ public void testInvalidApplicationProtocols() {
432432
}
433433
}
434434
}
435+
436+
@Test
437+
public void testEndHandlerCalledAfterAllEmissions() throws Exception {
438+
Buffer buffer = TestUtils.randomBuffer(1024 * 1024);
439+
server.handler(conn -> {
440+
conn.handler(stream -> {
441+
stream.endHandler(v -> {
442+
stream.end(buffer);
443+
});
444+
});
445+
});
446+
server.bind(SocketAddress.inetSocketAddress(9999, "localhost")).await();
447+
client.bind(SocketAddress.inetSocketAddress(0, "localhost")).await();
448+
QuicConnection connection = client.connect(SocketAddress.inetSocketAddress(9999, "localhost")).await();
449+
QuicStream stream = connection.openStream().await();
450+
AtomicInteger received = new AtomicInteger();
451+
AtomicInteger ended = new AtomicInteger();
452+
stream.handler(buf -> {
453+
received.addAndGet(buf.length());
454+
assertEquals(0, ended.get());
455+
stream.pause();
456+
vertx.setTimer(50, t -> stream.resume());
457+
});
458+
stream.endHandler(v -> {
459+
assertEquals(0, ended.getAndIncrement());
460+
});
461+
stream.end();
462+
assertWaitUntil(() -> received.get() == buffer.length());
463+
assertWaitUntil(() -> ended.get() > 0);
464+
}
435465
}

0 commit comments

Comments
 (0)