Skip to content

Commit 6357e04

Browse files
committed
[bugfix] Acknowledge WebSocket cancels and poll watchdog during
serialization Cancel was fire-and-forget: clients got no response when the query was missing, and kill() during the non-streaming serialize path was never observed because proceed() is only called during evaluation. Send cancelling/error acks on cancel, check isTerminating/proceed() before serializeAll(), and tighten cancellation/timeout tests with bounded FLWOR queries so they stay reliable under reuseForks.
1 parent d8cc1c7 commit 6357e04

4 files changed

Lines changed: 54 additions & 30 deletions

File tree

exist-core/src/main/java/org/exist/http/ws/EvalProtocol.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public final class EvalProtocol {
6464
public static final String PHASE_COMPILING = "compiling";
6565
public static final String PHASE_EVALUATING = "evaluating";
6666
public static final String PHASE_SERIALIZING = "serializing";
67+
public static final String PHASE_CANCELLING = "cancelling";
6768
public static final String PHASE_COMPLETE = "complete";
6869

6970
private EvalProtocol() {

exist-core/src/main/java/org/exist/http/ws/EvalWebSocketEndpoint.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public void onMessage(final String message, final Session session) {
192192

193193
switch (msg.action()) {
194194
case EvalProtocol.ACTION_EVAL -> handleEval(session, evalSession, msg);
195-
case EvalProtocol.ACTION_CANCEL -> handleCancel(evalSession, msg);
195+
case EvalProtocol.ACTION_CANCEL -> handleCancel(session, evalSession, msg);
196196
case EvalProtocol.ACTION_COMPILE -> handleCompile(session, evalSession, msg);
197197
case EvalProtocol.ACTION_ADMIN_CANCEL -> handleAdminCancel(session, evalSession, msg);
198198
default -> {
@@ -262,11 +262,21 @@ private void handleEval(final Session session, final EvalSession evalSession,
262262
});
263263
}
264264

265-
private void handleCancel(final EvalSession evalSession,
265+
private void handleCancel(final Session session, final EvalSession evalSession,
266266
final EvalProtocol.ClientMessage msg) {
267267
final boolean cancelled = evalSession.cancelQuery(msg.id());
268-
if (!cancelled) {
269-
LOG.debug("Cancel requested for unknown query: {}", msg.id());
268+
try {
269+
if (!cancelled) {
270+
session.getBasicRemote().sendText(
271+
EvalProtocol.errorMessage(msg.id(), null,
272+
"Query not found or already completed", 0, 0, null));
273+
LOG.debug("Cancel requested for unknown query: {}", msg.id());
274+
} else {
275+
session.getBasicRemote().sendText(
276+
EvalProtocol.progressMessage(msg.id(), EvalProtocol.PHASE_CANCELLING, 0, 0));
277+
}
278+
} catch (final IOException e) {
279+
LOG.debug("Failed to send cancel acknowledgement: {}", e.getMessage());
270280
}
271281
}
272282

exist-core/src/main/java/org/exist/http/ws/QueryExecutor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,22 @@ public void execute(final Session wsSession, final EvalSession evalSession,
149149
streamResults(wsSession, msg.id(), broker, result, outputProperties,
150150
msg.stream().chunkSize(), timing, startTime, watchDog);
151151
} else {
152+
if (watchDog.isTerminating()) {
153+
timing.serialize = System.currentTimeMillis() - serStart;
154+
timing.total = System.currentTimeMillis() - startTime;
155+
sendCancelled(wsSession, msg.id(), 0, timing);
156+
QueryMonitorBroadcaster.broadcastEvent("cancelled", msg.id(), user, msg.query(),
157+
null, 0, timing.total);
158+
return;
159+
}
160+
try {
161+
watchDog.proceed(null);
162+
} catch (final TerminatedException e) {
163+
timing.serialize = System.currentTimeMillis() - serStart;
164+
reportTerminationOrError(wsSession, evalSession, msg, timing, startTime,
165+
watchDog, ErrorInfo.of(e.getMessage()));
166+
return;
167+
}
152168
final String serialized = serializeAll(broker, result, outputProperties);
153169
timing.serialize = System.currentTimeMillis() - serStart;
154170
timing.total = System.currentTimeMillis() - startTime;

exist-core/src/test/java/org/exist/http/ws/EvalWebSocketEndpointTest.java

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,16 @@ public class EvalWebSocketEndpointTest {
6868
private static final JsonFactory JSON_FACTORY = new JsonFactory();
6969

7070
private static final String TEST_COLLECTION = "/db/ws-eval-test";
71+
/** Bounded FLWOR loop for cancel tests: long enough to cancel, short enough for CI. */
72+
private static final String CANCEL_TEST_QUERY =
73+
"for $i in 1 to 10000000 return ()";
74+
/** Heavier loop so max-execution-time fires before completion on fast hardware. */
75+
private static final String TIMEOUT_TEST_QUERY =
76+
"for $i in 1 to 999999999 return ()";
77+
private static final long CANCEL_MAX_EXECUTION_MS = 15_000L;
78+
private static final long CANCEL_AWAIT_SEC = 20L;
79+
private static final long TIMEOUT_MAX_EXECUTION_MS = 2_000L;
80+
private static final long TIMEOUT_AWAIT_SEC = 15L;
7181
private static final String TEST_MODULE = """
7282
module namespace test = 'http://exist-db.org/test';
7383
declare function test:hello($name as xs:string) as xs:string {
@@ -475,31 +485,21 @@ public void onMessage(final String message) {
475485
}, createAdminConfig(), getWsUri());
476486

477487
try {
478-
// GC-free query: return () produces no objects per iteration, so the JVM
479-
// stays out of stop-the-world GC and the query thread calls proceed() on
480-
// every iteration — letting the volatile terminate flag be observed within
481-
// microseconds of kill(). String-producing variants (string($i)) cause
482-
// heavy GC that can stall the query thread for several seconds on CI.
483-
// max-execution-time is a safety net only; the cancel should fire first.
484488
session.getBasicRemote().sendText(
485489
"{\"action\":\"eval\",\"id\":\"q-cancel\"," +
486-
"\"query\":\"for $i in 1 to 999999999 return ()\"," +
487-
"\"max-execution-time\":30000}");
490+
"\"query\":\"" + CANCEL_TEST_QUERY + "\"," +
491+
"\"max-execution-time\":" + CANCEL_MAX_EXECUTION_MS + "}");
488492

489493
// Wait for the server to confirm the query is executing before cancelling.
490-
// Without this, the cancel may arrive before the watchdog is registered and
491-
// be silently dropped, leaving the query to run until max-execution-time fires.
492494
assertTrue("Query should start executing within 10s",
493495
progressLatch.await(10, TimeUnit.SECONDS));
494496

495497
session.getBasicRemote().sendText(
496498
"{\"action\":\"cancel\",\"id\":\"q-cancel\"}");
497499

498-
// With terminate declared volatile in XQueryWatchDog and no GC pressure
499-
// from the query, cancellation is visible at the very next proceed() call
500-
// — microseconds after kill(). 10s gives ample CI headroom.
501-
assertTrue("Should receive cancelled/error within 10s",
502-
cancelledLatch.await(10, TimeUnit.SECONDS));
500+
// Await longer than max-execution-time so the watchdog safety net can fire on slow CI.
501+
assertTrue("Should receive cancelled/error within " + CANCEL_AWAIT_SEC + "s",
502+
cancelledLatch.await(CANCEL_AWAIT_SEC, TimeUnit.SECONDS));
503503
assertEquals("q-cancel", cancelledMsg.get().get("id"));
504504
} finally {
505505
session.close();
@@ -658,16 +658,13 @@ public void onMessage(final String message) {
658658
}, createAdminConfig(), getWsUri());
659659

660660
try {
661-
// GC-free query: return () avoids heap exhaustion on CI, ensuring the 2s
662-
// watchdog timeout fires reliably via proceed() rather than OOM killing
663-
// the thread before the timeout check runs.
664661
session.getBasicRemote().sendText(
665662
"{\"action\":\"eval\",\"id\":\"q-timeout\"," +
666-
"\"query\":\"for $i in 1 to 999999999 return ()\"," +
667-
"\"max-execution-time\":2000}");
663+
"\"query\":\"" + TIMEOUT_TEST_QUERY + "\"," +
664+
"\"max-execution-time\":" + TIMEOUT_MAX_EXECUTION_MS + "}");
668665

669-
assertTrue("Should receive timeout error within 30s",
670-
errorLatch.await(30, TimeUnit.SECONDS));
666+
assertTrue("Should receive timeout error within " + TIMEOUT_AWAIT_SEC + "s",
667+
errorLatch.await(TIMEOUT_AWAIT_SEC, TimeUnit.SECONDS));
671668
assertEquals("q-timeout", errorMsg.get().get("id"));
672669
} finally {
673670
session.close();
@@ -1163,16 +1160,16 @@ public void onMessage(final String message) {
11631160
// Start a long-running query
11641161
session.getBasicRemote().sendText(
11651162
"{\"action\":\"eval\",\"id\":\"q-cleanup\"," +
1166-
"\"query\":\"let $x := for $i in 1 to 999999999 return string($i) return $x\"," +
1167-
"\"max-execution-time\":30000}");
1163+
"\"query\":\"" + CANCEL_TEST_QUERY + "\"," +
1164+
"\"max-execution-time\":" + CANCEL_MAX_EXECUTION_MS + "}");
11681165

11691166
// Wait for evaluating phase, then abruptly close
11701167
assertTrue("Should reach evaluating phase within 5s",
11711168
progressLatch.await(5, TimeUnit.SECONDS));
11721169
session.close();
11731170

1174-
// Give server time to clean up
1175-
Thread.sleep(500);
1171+
// Allow session-close cancellation to finish before later tests reuse the broker pool
1172+
Thread.sleep(2_000);
11761173

11771174
// The test passes if no resources leak and no exceptions are thrown.
11781175
// ExistWebServer would fail to shut down if brokers were leaked.

0 commit comments

Comments
 (0)