Skip to content

Commit 488a804

Browse files
committed
[bugfix] WebSocketEndpoint: suppress ClosedChannelException
replace text ping with WebSocket PING control frame serialize Long and Double as JSON numbers clear sessions on shutdown handle interruptions more gracefully
1 parent 37d5309 commit 488a804

1 file changed

Lines changed: 34 additions & 13 deletions

File tree

exist-core/src/main/java/org/exist/xquery/functions/websocket/WebSocketEndpoint.java

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
import jakarta.websocket.server.ServerEndpoint;
3636
import java.io.IOException;
3737
import java.io.StringWriter;
38-
import java.util.Iterator;
38+
import java.nio.ByteBuffer;
39+
import java.nio.channels.ClosedChannelException;
3940
import java.util.Map;
4041
import java.util.concurrent.ConcurrentHashMap;
4142
import java.util.concurrent.Executors;
@@ -63,6 +64,7 @@ public class WebSocketEndpoint {
6364
private static final Logger LOG = LogManager.getLogger(WebSocketEndpoint.class);
6465
private static final JsonFactory JSON_FACTORY = new JsonFactory();
6566
private static final Map<Session, String> sessions = new ConcurrentHashMap<>();
67+
private static final ByteBuffer PING_PAYLOAD = ByteBuffer.allocate(0);
6668

6769
private static volatile boolean initialized = false;
6870
private static ScheduledExecutorService heartbeatService = null;
@@ -109,13 +111,13 @@ public static synchronized void shutdown() {
109111
monitorService.shutdown();
110112
monitorService = null;
111113
}
114+
sessions.clear();
112115
initialized = false;
113116
}
114117
}
115118

116119
@OnOpen
117120
public void openSession(final Session session) {
118-
session.setMaxIdleTimeout(10000);
119121
sessions.put(session, DEFAULT_CHANNEL);
120122
}
121123

@@ -124,6 +126,16 @@ public void closeSession(final Session session, final CloseReason closeReason) {
124126
sessions.remove(session);
125127
}
126128

129+
@OnError
130+
public void onError(final Session session, final Throwable throwable) {
131+
sessions.remove(session);
132+
if (throwable instanceof ClosedChannelException) {
133+
LOG.debug("WebSocket client disconnected abruptly: session {}", session.getId());
134+
} else {
135+
LOG.warn("WebSocket error on session {}: {}", session.getId(), throwable.getMessage(), throwable);
136+
}
137+
}
138+
127139
@OnMessage
128140
public void recv(final String message, final Session session) {
129141
try (final JsonParser parser = JSON_FACTORY.createParser(message)) {
@@ -142,7 +154,16 @@ public void recv(final String message, final Session session) {
142154
}
143155

144156
static void pingAll() {
145-
sendAll(null, "ping");
157+
for (final Session session : sessions.keySet()) {
158+
try {
159+
session.getBasicRemote().sendPing(PING_PAYLOAD.duplicate());
160+
} catch (final ClosedChannelException e) {
161+
sessions.remove(session);
162+
} catch (final IOException e) {
163+
LOG.debug("Ping failed, removing session {}: {}", session.getId(), e.getMessage());
164+
sessions.remove(session);
165+
}
166+
}
146167
}
147168

148169
/**
@@ -161,6 +182,8 @@ public static void sendAll(final String toChannel, final Map<String, Object> dat
161182
switch (entry.getValue()) {
162183
case String s -> gen.writeStringField(key, s);
163184
case Integer i -> gen.writeNumberField(key, i);
185+
case Long l -> gen.writeNumberField(key, l);
186+
case Double d -> gen.writeNumberField(key, d);
164187
case Boolean b -> gen.writeBooleanField(key, b);
165188
case null -> gen.writeNullField(key);
166189
default -> gen.writeStringField(key, entry.getValue().toString());
@@ -181,18 +204,16 @@ public static void sendAll(final String toChannel, final Map<String, Object> dat
181204
* @param message the message text
182205
*/
183206
public static void sendAll(final String toChannel, final String message) {
184-
final Iterator<Map.Entry<Session, String>> iterator = sessions.entrySet().iterator();
185-
while (iterator.hasNext()) {
186-
try {
187-
final Map.Entry<Session, String> entry = iterator.next();
188-
final Session session = entry.getKey();
189-
final String channel = entry.getValue();
190-
191-
if (toChannel == null || (!channel.equals(DEFAULT_CHANNEL) && toChannel.equals(channel))) {
207+
for (final Map.Entry<Session, String> entry : sessions.entrySet()) {
208+
final Session session = entry.getKey();
209+
final String channel = entry.getValue();
210+
if (toChannel == null || (!channel.equals(DEFAULT_CHANNEL) && toChannel.equals(channel))) {
211+
try {
192212
session.getBasicRemote().sendText(message);
213+
} catch (final IOException e) {
214+
LOG.debug("Removing disconnected WebSocket session: {}", e.getMessage());
215+
sessions.remove(session);
193216
}
194-
} catch (final IOException e) {
195-
LOG.error("Error sending message via websocket: {}", e.getMessage(), e);
196217
}
197218
}
198219
}

0 commit comments

Comments
 (0)