Skip to content

Commit b398566

Browse files
committed
[bugfix] Acknowledge WebSocket cancels and poll watchdog during
serialization Cancel was fire-and-forget: clients got no response when the query was missing, and kill() during the non-streaming serialize path was never observed because proceed() is only called during evaluation. Send cancelling/error acks on cancel, check isTerminating/proceed() before serializeAll(), and tighten cancellation/timeout tests with bounded FLWOR queries so they stay reliable under reuseForks.
1 parent d8cc1c7 commit b398566

4 files changed

Lines changed: 65 additions & 31 deletions

File tree

exist-core/src/main/java/org/exist/http/ws/EvalProtocol.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public final class EvalProtocol {
6464
public static final String PHASE_COMPILING = "compiling";
6565
public static final String PHASE_EVALUATING = "evaluating";
6666
public static final String PHASE_SERIALIZING = "serializing";
67+
public static final String PHASE_CANCELLING = "cancelling";
6768
public static final String PHASE_COMPLETE = "complete";
6869

6970
private EvalProtocol() {

exist-core/src/main/java/org/exist/http/ws/EvalWebSocketEndpoint.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public void onMessage(final String message, final Session session) {
192192

193193
switch (msg.action()) {
194194
case EvalProtocol.ACTION_EVAL -> handleEval(session, evalSession, msg);
195-
case EvalProtocol.ACTION_CANCEL -> handleCancel(evalSession, msg);
195+
case EvalProtocol.ACTION_CANCEL -> handleCancel(session, evalSession, msg);
196196
case EvalProtocol.ACTION_COMPILE -> handleCompile(session, evalSession, msg);
197197
case EvalProtocol.ACTION_ADMIN_CANCEL -> handleAdminCancel(session, evalSession, msg);
198198
default -> {
@@ -262,11 +262,21 @@ private void handleEval(final Session session, final EvalSession evalSession,
262262
});
263263
}
264264

265-
private void handleCancel(final EvalSession evalSession,
265+
private void handleCancel(final Session session, final EvalSession evalSession,
266266
final EvalProtocol.ClientMessage msg) {
267267
final boolean cancelled = evalSession.cancelQuery(msg.id());
268-
if (!cancelled) {
269-
LOG.debug("Cancel requested for unknown query: {}", msg.id());
268+
try {
269+
if (!cancelled) {
270+
session.getBasicRemote().sendText(
271+
EvalProtocol.errorMessage(msg.id(), null,
272+
"Query not found or already completed", 0, 0, null));
273+
LOG.debug("Cancel requested for unknown query: {}", msg.id());
274+
} else {
275+
session.getBasicRemote().sendText(
276+
EvalProtocol.progressMessage(msg.id(), EvalProtocol.PHASE_CANCELLING, 0, 0));
277+
}
278+
} catch (final IOException e) {
279+
LOG.debug("Failed to send cancel acknowledgement: {}", e.getMessage());
270280
}
271281
}
272282

exist-core/src/main/java/org/exist/http/ws/QueryExecutor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,22 @@ public void execute(final Session wsSession, final EvalSession evalSession,
149149
streamResults(wsSession, msg.id(), broker, result, outputProperties,
150150
msg.stream().chunkSize(), timing, startTime, watchDog);
151151
} else {
152+
if (watchDog.isTerminating()) {
153+
timing.serialize = System.currentTimeMillis() - serStart;
154+
timing.total = System.currentTimeMillis() - startTime;
155+
sendCancelled(wsSession, msg.id(), 0, timing);
156+
QueryMonitorBroadcaster.broadcastEvent("cancelled", msg.id(), user, msg.query(),
157+
null, 0, timing.total);
158+
return;
159+
}
160+
try {
161+
watchDog.proceed(null);
162+
} catch (final TerminatedException e) {
163+
timing.serialize = System.currentTimeMillis() - serStart;
164+
reportTerminationOrError(wsSession, evalSession, msg, timing, startTime,
165+
watchDog, ErrorInfo.of(e.getMessage()));
166+
return;
167+
}
152168
final String serialized = serializeAll(broker, result, outputProperties);
153169
timing.serialize = System.currentTimeMillis() - serStart;
154170
timing.total = System.currentTimeMillis() - startTime;

exist-core/src/test/java/org/exist/http/ws/EvalWebSocketEndpointTest.java

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,19 @@ public class EvalWebSocketEndpointTest {
6868
private static final JsonFactory JSON_FACTORY = new JsonFactory();
6969

7070
private static final String TEST_COLLECTION = "/db/ws-eval-test";
71+
/** Bounded FLWOR loop for cancel tests: long enough to cancel, short enough for CI. */
72+
private static final String CANCEL_TEST_QUERY =
73+
"for $i in 1 to 10000000 return ()";
74+
/** Heavier loop so max-execution-time fires before completion on fast hardware. */
75+
private static final String TIMEOUT_TEST_QUERY =
76+
"for $i in 1 to 999999999 return ()";
77+
private static final long CANCEL_MAX_EXECUTION_MS = 15_000L;
78+
private static final long CANCEL_AWAIT_SEC = 20L;
79+
private static final long TIMEOUT_MAX_EXECUTION_MS = 2_000L;
80+
/** Wait for eval to start when CI reuses forks and brokers are busy. */
81+
private static final long TIMEOUT_START_AWAIT_SEC = 30L;
82+
/** After evaluating begins, 2s watchdog should fire on the next few proceed() calls. */
83+
private static final long TIMEOUT_ERROR_AWAIT_SEC = 8L;
7184
private static final String TEST_MODULE = """
7285
module namespace test = 'http://exist-db.org/test';
7386
declare function test:hello($name as xs:string) as xs:string {
@@ -475,31 +488,21 @@ public void onMessage(final String message) {
475488
}, createAdminConfig(), getWsUri());
476489

477490
try {
478-
// GC-free query: return () produces no objects per iteration, so the JVM
479-
// stays out of stop-the-world GC and the query thread calls proceed() on
480-
// every iteration — letting the volatile terminate flag be observed within
481-
// microseconds of kill(). String-producing variants (string($i)) cause
482-
// heavy GC that can stall the query thread for several seconds on CI.
483-
// max-execution-time is a safety net only; the cancel should fire first.
484491
session.getBasicRemote().sendText(
485492
"{\"action\":\"eval\",\"id\":\"q-cancel\"," +
486-
"\"query\":\"for $i in 1 to 999999999 return ()\"," +
487-
"\"max-execution-time\":30000}");
493+
"\"query\":\"" + CANCEL_TEST_QUERY + "\"," +
494+
"\"max-execution-time\":" + CANCEL_MAX_EXECUTION_MS + "}");
488495

489496
// Wait for the server to confirm the query is executing before cancelling.
490-
// Without this, the cancel may arrive before the watchdog is registered and
491-
// be silently dropped, leaving the query to run until max-execution-time fires.
492497
assertTrue("Query should start executing within 10s",
493498
progressLatch.await(10, TimeUnit.SECONDS));
494499

495500
session.getBasicRemote().sendText(
496501
"{\"action\":\"cancel\",\"id\":\"q-cancel\"}");
497502

498-
// With terminate declared volatile in XQueryWatchDog and no GC pressure
499-
// from the query, cancellation is visible at the very next proceed() call
500-
// — microseconds after kill(). 10s gives ample CI headroom.
501-
assertTrue("Should receive cancelled/error within 10s",
502-
cancelledLatch.await(10, TimeUnit.SECONDS));
503+
// Await longer than max-execution-time so the watchdog safety net can fire on slow CI.
504+
assertTrue("Should receive cancelled/error within " + CANCEL_AWAIT_SEC + "s",
505+
cancelledLatch.await(CANCEL_AWAIT_SEC, TimeUnit.SECONDS));
503506
assertEquals("q-cancel", cancelledMsg.get().get("id"));
504507
} finally {
505508
session.close();
@@ -632,6 +635,7 @@ public void onMessage(final String message) {
632635

633636
@Test
634637
public void maxExecutionTime() throws Exception {
638+
final CountDownLatch evalLatch = new CountDownLatch(1);
635639
final CountDownLatch errorLatch = new CountDownLatch(1);
636640
final AtomicReference<Map<String, Object>> errorMsg = new AtomicReference<>();
637641

@@ -644,7 +648,10 @@ public void onOpen(final Session session, final EndpointConfig config) {
644648
public void onMessage(final String message) {
645649
try {
646650
final Map<String, Object> parsed = parseJson(message);
647-
if ("error".equals(parsed.get("type"))
651+
if ("progress".equals(parsed.get("type"))
652+
&& "evaluating".equals(parsed.get("phase"))) {
653+
evalLatch.countDown();
654+
} else if ("error".equals(parsed.get("type"))
648655
|| "cancelled".equals(parsed.get("type"))) {
649656
errorMsg.set(parsed);
650657
errorLatch.countDown();
@@ -658,16 +665,16 @@ public void onMessage(final String message) {
658665
}, createAdminConfig(), getWsUri());
659666

660667
try {
661-
// GC-free query: return () avoids heap exhaustion on CI, ensuring the 2s
662-
// watchdog timeout fires reliably via proceed() rather than OOM killing
663-
// the thread before the timeout check runs.
664668
session.getBasicRemote().sendText(
665669
"{\"action\":\"eval\",\"id\":\"q-timeout\"," +
666-
"\"query\":\"for $i in 1 to 999999999 return ()\"," +
667-
"\"max-execution-time\":2000}");
670+
"\"query\":\"" + TIMEOUT_TEST_QUERY + "\"," +
671+
"\"max-execution-time\":" + TIMEOUT_MAX_EXECUTION_MS + "}");
672+
673+
assertTrue("Query should reach evaluating within " + TIMEOUT_START_AWAIT_SEC + "s",
674+
evalLatch.await(TIMEOUT_START_AWAIT_SEC, TimeUnit.SECONDS));
668675

669-
assertTrue("Should receive timeout error within 30s",
670-
errorLatch.await(30, TimeUnit.SECONDS));
676+
assertTrue("Should receive timeout error within " + TIMEOUT_ERROR_AWAIT_SEC + "s",
677+
errorLatch.await(TIMEOUT_ERROR_AWAIT_SEC, TimeUnit.SECONDS));
671678
assertEquals("q-timeout", errorMsg.get().get("id"));
672679
} finally {
673680
session.close();
@@ -1163,16 +1170,16 @@ public void onMessage(final String message) {
11631170
// Start a long-running query
11641171
session.getBasicRemote().sendText(
11651172
"{\"action\":\"eval\",\"id\":\"q-cleanup\"," +
1166-
"\"query\":\"let $x := for $i in 1 to 999999999 return string($i) return $x\"," +
1167-
"\"max-execution-time\":30000}");
1173+
"\"query\":\"" + CANCEL_TEST_QUERY + "\"," +
1174+
"\"max-execution-time\":" + CANCEL_MAX_EXECUTION_MS + "}");
11681175

11691176
// Wait for evaluating phase, then abruptly close
11701177
assertTrue("Should reach evaluating phase within 5s",
11711178
progressLatch.await(5, TimeUnit.SECONDS));
11721179
session.close();
11731180

1174-
// Give server time to clean up
1175-
Thread.sleep(500);
1181+
// Allow session-close cancellation to finish before later tests reuse the broker pool
1182+
Thread.sleep(2_000);
11761183

11771184
// The test passes if no resources leak and no exceptions are thrown.
11781185
// ExistWebServer would fail to shut down if brokers were leaked.

0 commit comments

Comments
 (0)