Skip to content

Commit 70932ea

Browse files
ptrthomasclaude
andcommitted
http: pre-handshake WS client listeners + 1MB server WS receive cap
Two riders for the karate-ext single-port noVNC tunnel (D103): - WsClientOptions.Builder.onMessage/onClose/onError register listeners in the WsClient constructor, BEFORE the handshake — for server-speaks-first protocols (RFB/VNC behind websockify) a listener added after connect() returns can miss the opening frame. - HttpServerHandler's WebSocketServerHandshakerFactory now allows 1MB inbound frames (Netty defaults to 64k) — proxied streams (e.g. a VNC clipboard paste) can legitimately exceed the default. - WsServerClientIntegrationTest: first end-to-end HttpServer+WsClient coverage — upgrade handshake, server-first greeting via the options-registered listener, >64k binary echo. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent 738713a commit 70932ea

4 files changed

Lines changed: 156 additions & 1 deletion

File tree

karate-core/src/main/java/io/karatelabs/http/HttpServerHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ static boolean isWsUpgrade(FullHttpRequest req) {
173173

174174
private void handleWsUpgrade(ChannelHandlerContext ctx, FullHttpRequest req, HttpRequest request) {
175175
String wsUrl = "ws://" + req.headers().get(HttpHeaderNames.HOST) + req.uri();
176-
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(wsUrl, null, true);
176+
// 1 MB receive cap (Netty defaults to 64k) — proxied streams (e.g. a VNC clipboard paste
177+
// through the websockify tunnel) can legitimately exceed the default
178+
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(wsUrl, null, true, HttpUtils.MEGABYTE);
177179
WebSocketServerHandshaker handshaker = factory.newHandshaker(req);
178180
if (handshaker == null) {
179181
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());

karate-core/src/main/java/io/karatelabs/http/WsClient.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,17 @@ private WsClient(WsClientOptions options) {
110110
this.callbackExecutor = options.getCallbackExecutor() != null
111111
? options.getCallbackExecutor()
112112
: CALLBACK_EXECUTOR;
113+
// options-registered listeners attach BEFORE the handshake so a server-speaks-first
114+
// protocol (RFB/VNC) cannot race its opening frame past an empty listener list
115+
if (options.getMessageListener() != null) {
116+
messageListeners.add(options.getMessageListener());
117+
}
118+
if (options.getCloseListener() != null) {
119+
closeListeners.add(options.getCloseListener());
120+
}
121+
if (options.getErrorListener() != null) {
122+
errorListeners.add(options.getErrorListener());
123+
}
113124
}
114125

115126
private void doConnect() {

karate-core/src/main/java/io/karatelabs/http/WsClientOptions.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.LinkedHashMap;
3232
import java.util.Map;
3333
import java.util.concurrent.ExecutorService;
34+
import java.util.function.Consumer;
3435

3536
/**
3637
* Configuration for WebSocket client connections using builder pattern.
@@ -46,6 +47,9 @@ public class WsClientOptions {
4647
private final boolean trustAllCerts;
4748
private final SslContext sslContext;
4849
private final ExecutorService callbackExecutor;
50+
private final Consumer<WsFrame> messageListener;
51+
private final Runnable closeListener;
52+
private final Consumer<Throwable> errorListener;
4953

5054
private WsClientOptions(Builder builder) {
5155
this.uri = builder.uri;
@@ -59,6 +63,9 @@ private WsClientOptions(Builder builder) {
5963
this.trustAllCerts = builder.trustAllCerts;
6064
this.sslContext = builder.sslContext;
6165
this.callbackExecutor = builder.callbackExecutor;
66+
this.messageListener = builder.messageListener;
67+
this.closeListener = builder.closeListener;
68+
this.errorListener = builder.errorListener;
6269
}
6370

6471
public static Builder builder(String uri) {
@@ -124,6 +131,18 @@ public ExecutorService getCallbackExecutor() {
124131
return callbackExecutor;
125132
}
126133

134+
public Consumer<WsFrame> getMessageListener() {
135+
return messageListener;
136+
}
137+
138+
public Runnable getCloseListener() {
139+
return closeListener;
140+
}
141+
142+
public Consumer<Throwable> getErrorListener() {
143+
return errorListener;
144+
}
145+
127146
public static class Builder {
128147

129148
private final URI uri;
@@ -135,6 +154,9 @@ public static class Builder {
135154
private boolean trustAllCerts = true;
136155
private SslContext sslContext;
137156
private ExecutorService callbackExecutor;
157+
private Consumer<WsFrame> messageListener;
158+
private Runnable closeListener;
159+
private Consumer<Throwable> errorListener;
138160

139161
private Builder(URI uri) {
140162
if (uri == null) {
@@ -196,6 +218,28 @@ public Builder callbackExecutor(ExecutorService executor) {
196218
return this;
197219
}
198220

221+
/**
222+
* Register a message listener BEFORE the connection is opened. For server-speaks-first
223+
* protocols (e.g. RFB/VNC behind websockify) a listener added after
224+
* {@link WsClient#connect} returns can miss the server's opening frame.
225+
*/
226+
public Builder onMessage(Consumer<WsFrame> listener) {
227+
this.messageListener = listener;
228+
return this;
229+
}
230+
231+
/** Register a close listener before the connection is opened (see {@link #onMessage}). */
232+
public Builder onClose(Runnable listener) {
233+
this.closeListener = listener;
234+
return this;
235+
}
236+
237+
/** Register an error listener before the connection is opened (see {@link #onMessage}). */
238+
public Builder onError(Consumer<Throwable> listener) {
239+
this.errorListener = listener;
240+
return this;
241+
}
242+
199243
public WsClientOptions build() {
200244
return new WsClientOptions(this);
201245
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* The MIT License
3+
*
4+
* Copyright 2026 Karate Labs Inc.
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in
14+
* all copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22+
* THE SOFTWARE.
23+
*/
24+
package io.karatelabs.http;
25+
26+
import org.junit.jupiter.api.AfterAll;
27+
import org.junit.jupiter.api.BeforeAll;
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.util.Arrays;
31+
import java.util.List;
32+
import java.util.concurrent.CopyOnWriteArrayList;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.TimeUnit;
35+
36+
import static org.junit.jupiter.api.Assertions.*;
37+
38+
/**
39+
* End-to-end WebSocket coverage of {@link HttpServer} + {@link WsClient}: the upgrade handshake,
40+
* a server-speaks-first opening frame caught by an options-registered listener (a listener added
41+
* only after {@code connect()} returns can miss it), and a binary echo above Netty's 64k default
42+
* receive cap (the handshaker is configured to 1 MB for proxied streams like a websockify tunnel).
43+
*/
44+
class WsServerClientIntegrationTest {
45+
46+
private static HttpServer server;
47+
48+
@BeforeAll
49+
static void start() {
50+
server = HttpServer.start(0,
51+
req -> HttpResponse.notFound("ws only"),
52+
null,
53+
(req, connection) -> {
54+
// server speaks FIRST (the RFB/VNC shape), then echoes binary back
55+
connection.send("hello:" + req.getPath());
56+
connection.onBinary(connection::sendBytes);
57+
});
58+
}
59+
60+
@AfterAll
61+
static void stop() {
62+
server.stopAsync();
63+
}
64+
65+
@Test
66+
void serverGreetingAndLargeBinaryEchoRoundTrip() throws Exception {
67+
byte[] big = new byte[200 * 1024]; // > the 64k Netty default the handshaker overrides
68+
for (int i = 0; i < big.length; i++) {
69+
big[i] = (byte) i;
70+
}
71+
CountDownLatch greeting = new CountDownLatch(1);
72+
CountDownLatch echoed = new CountDownLatch(1);
73+
List<WsFrame> frames = new CopyOnWriteArrayList<>();
74+
WsClient client = WsClient.connect(WsClientOptions
75+
.builder("ws://localhost:" + server.getPort() + "/tunnel")
76+
.onMessage(frame -> {
77+
frames.add(frame);
78+
if (frame.isText()) {
79+
greeting.countDown();
80+
} else {
81+
echoed.countDown();
82+
}
83+
})
84+
.build());
85+
try {
86+
// the opening frame arrives even though no listener was added post-connect
87+
assertTrue(greeting.await(5, TimeUnit.SECONDS), "server-first greeting frame");
88+
assertEquals("hello:/tunnel", frames.get(0).getText());
89+
client.send(big);
90+
assertTrue(echoed.await(5, TimeUnit.SECONDS), "large binary echo");
91+
byte[] back = frames.get(frames.size() - 1).getBytes();
92+
assertTrue(Arrays.equals(big, back), "binary payload survives the round-trip");
93+
} finally {
94+
client.close();
95+
}
96+
}
97+
98+
}

0 commit comments

Comments
 (0)