4545import java .util .Map ;
4646import java .util .Optional ;
4747import java .util .Properties ;
48+ import java .util .concurrent .atomic .AtomicBoolean ;
4849
4950/**
5051 * Executes XQuery expressions for the /ws/eval endpoint with support for
@@ -56,6 +57,9 @@ public final class QueryExecutor {
5657
5758 private static final long PROGRESS_INTERVAL_MS = 500 ;
5859
60+ static final String WALL_CLOCK_TIMEOUT_MESSAGE =
61+ "The query exceeded the predefined timeout and has been killed." ;
62+
5963 private final BrokerPool pool ;
6064
6165 public QueryExecutor (final BrokerPool pool ) {
@@ -110,6 +114,14 @@ public void execute(final Session wsSession, final EvalSession evalSession,
110114 }
111115 evalSession .registerQuery (msg .id (), watchDog );
112116
117+ final AtomicBoolean terminalResponseSent = new AtomicBoolean (false );
118+ WallClockQueryTimeout wallClockTimeout = null ;
119+ if (msg .maxExecutionTime () > 0 ) {
120+ wallClockTimeout = new WallClockQueryTimeout ();
121+ wallClockTimeout .schedule (msg .maxExecutionTime (), () -> handleWallClockTimeout (
122+ wsSession , evalSession , msg , timing , startTime , watchDog , terminalResponseSent ));
123+ }
124+
113125 try {
114126 // Evaluate phase
115127 sendProgress (wsSession , msg .id (), EvalProtocol .PHASE_EVALUATING , 0 ,
@@ -124,12 +136,13 @@ public void execute(final Session wsSession, final EvalSession evalSession,
124136 } catch (final TerminatedException e ) {
125137 timing .evaluate = System .currentTimeMillis () - evalStart ;
126138 reportTerminationOrError (wsSession , evalSession , msg , timing , startTime ,
127- watchDog , ErrorInfo .of (e .getMessage (), e .getLine (), e .getColumn ()));
139+ watchDog , ErrorInfo .of (e .getMessage (), e .getLine (), e .getColumn ()),
140+ terminalResponseSent );
128141 return ;
129142 } catch (final XPathException e ) {
130143 timing .evaluate = System .currentTimeMillis () - evalStart ;
131144 reportTerminationOrError (wsSession , evalSession , msg , timing , startTime ,
132- watchDog , ErrorInfo .of (e ));
145+ watchDog , ErrorInfo .of (e ), terminalResponseSent );
133146 return ;
134147 }
135148
@@ -147,37 +160,43 @@ public void execute(final Session wsSession, final EvalSession evalSession,
147160 try {
148161 if (msg .stream ().enabled () && itemCount > msg .stream ().chunkSize ()) {
149162 streamResults (wsSession , msg .id (), broker , result , outputProperties ,
150- msg .stream ().chunkSize (), timing , startTime , watchDog );
163+ msg .stream ().chunkSize (), timing , startTime , watchDog ,
164+ terminalResponseSent , evalSession , msg );
151165 } else {
152166 if (watchDog .isTerminating ()) {
153167 timing .serialize = System .currentTimeMillis () - serStart ;
154168 timing .total = System .currentTimeMillis () - startTime ;
155- sendCancelled (wsSession , msg .id (), 0 , timing );
156- QueryMonitorBroadcaster .broadcastEvent ("cancelled" , msg .id (), user , msg .query (),
157- null , 0 , timing .total );
169+ sendCancelledIfAbsent (wsSession , msg .id (), evalSession , msg , timing ,
170+ 0 , terminalResponseSent );
158171 return ;
159172 }
160173 try {
161174 watchDog .proceed (null );
162175 } catch (final TerminatedException e ) {
163176 timing .serialize = System .currentTimeMillis () - serStart ;
164177 reportTerminationOrError (wsSession , evalSession , msg , timing , startTime ,
165- watchDog , ErrorInfo .of (e .getMessage ()));
178+ watchDog , ErrorInfo .of (e .getMessage ()), terminalResponseSent );
166179 return ;
167180 }
168181 final String serialized = serializeAll (broker , result , outputProperties );
169182 timing .serialize = System .currentTimeMillis () - serStart ;
170183 timing .total = System .currentTimeMillis () - startTime ;
171184
172- sendResult (wsSession , msg .id (), 1 , serialized , false , timing , itemCount );
185+ if (terminalResponseSent .compareAndSet (false , true )) {
186+ sendResult (wsSession , msg .id (), 1 , serialized , false , timing , itemCount );
187+ QueryMonitorBroadcaster .broadcastEvent ("completed" , msg .id (), user , msg .query (),
188+ null , itemCount , timing .total );
189+ }
173190 }
174- QueryMonitorBroadcaster .broadcastEvent ("completed" , msg .id (), user , msg .query (),
175- null , itemCount , System .currentTimeMillis () - startTime );
176191 } catch (final SAXException | XPathException e ) {
177192 timing .serialize = System .currentTimeMillis () - serStart ;
178- reportError (wsSession , evalSession , msg , timing , startTime , ErrorInfo .of (e .getMessage ()));
193+ reportError (wsSession , evalSession , msg , timing , startTime , ErrorInfo .of (e .getMessage ()),
194+ terminalResponseSent );
179195 }
180196 } finally {
197+ if (wallClockTimeout != null ) {
198+ wallClockTimeout .cancel ();
199+ }
181200 evalSession .unregisterQuery (msg .id ());
182201 context .runCleanupTasks ();
183202 context .reset (); // needed: resetContext=false skipped this in xquery.execute()
@@ -209,6 +228,16 @@ static ErrorInfo of(final String message) {
209228 private void reportError (final Session wsSession , final EvalSession evalSession ,
210229 final EvalProtocol .ClientMessage msg , final EvalProtocol .Timing timing ,
211230 final long startTime , final ErrorInfo error ) {
231+ reportError (wsSession , evalSession , msg , timing , startTime , error , null );
232+ }
233+
234+ private void reportError (final Session wsSession , final EvalSession evalSession ,
235+ final EvalProtocol .ClientMessage msg , final EvalProtocol .Timing timing ,
236+ final long startTime , final ErrorInfo error ,
237+ @ Nullable final AtomicBoolean terminalGate ) {
238+ if (terminalGate != null && !terminalGate .compareAndSet (false , true )) {
239+ return ;
240+ }
212241 timing .total = System .currentTimeMillis () - startTime ;
213242 if (error .xpe () != null ) {
214243 sendError (wsSession , msg .id (), error .xpe (), timing );
@@ -224,16 +253,50 @@ private void reportTerminationOrError(final Session wsSession, final EvalSession
224253 final EvalProtocol .Timing timing , final long startTime ,
225254 final XQueryWatchDog watchDog ,
226255 final ErrorInfo error ) {
256+ reportTerminationOrError (wsSession , evalSession , msg , timing , startTime , watchDog , error , null );
257+ }
258+
259+ private void reportTerminationOrError (final Session wsSession , final EvalSession evalSession ,
260+ final EvalProtocol .ClientMessage msg ,
261+ final EvalProtocol .Timing timing , final long startTime ,
262+ final XQueryWatchDog watchDog ,
263+ final ErrorInfo error ,
264+ @ Nullable final AtomicBoolean terminalGate ) {
227265 if (watchDog .isTerminating ()) {
228266 timing .total = System .currentTimeMillis () - startTime ;
229- sendCancelled (wsSession , msg .id (), 0 , timing );
230- QueryMonitorBroadcaster .broadcastEvent ("cancelled" , msg .id (),
231- evalSession .getSubject ().getName (), msg .query (), null , 0 , timing .total );
267+ sendCancelledIfAbsent (wsSession , msg .id (), evalSession , msg , timing , 0 , terminalGate );
232268 } else {
233- reportError (wsSession , evalSession , msg , timing , startTime , error );
269+ reportError (wsSession , evalSession , msg , timing , startTime , error , terminalGate );
270+ }
271+ }
272+
273+ private void handleWallClockTimeout (final Session wsSession , final EvalSession evalSession ,
274+ final EvalProtocol .ClientMessage msg ,
275+ final EvalProtocol .Timing timing , final long startTime ,
276+ final XQueryWatchDog watchDog ,
277+ final AtomicBoolean terminalResponseSent ) {
278+ watchDog .kill (0 );
279+ if (terminalResponseSent .compareAndSet (false , true )) {
280+ timing .total = System .currentTimeMillis () - startTime ;
281+ sendError (wsSession , msg .id (), null , WALL_CLOCK_TIMEOUT_MESSAGE , 0 , 0 , timing );
282+ QueryMonitorBroadcaster .broadcastEvent ("error" , msg .id (),
283+ evalSession .getSubject ().getName (), msg .query (), null , 0 , timing .total );
234284 }
235285 }
236286
287+ private void sendCancelledIfAbsent (final Session wsSession , final String queryId ,
288+ final EvalSession evalSession ,
289+ final EvalProtocol .ClientMessage msg ,
290+ final EvalProtocol .Timing timing , final long items ,
291+ @ Nullable final AtomicBoolean terminalGate ) {
292+ if (terminalGate != null && !terminalGate .compareAndSet (false , true )) {
293+ return ;
294+ }
295+ sendCancelled (wsSession , queryId , items , timing );
296+ QueryMonitorBroadcaster .broadcastEvent ("cancelled" , queryId ,
297+ evalSession .getSubject ().getName (), msg .query (), null , items , timing .total );
298+ }
299+
237300 /**
238301 * Compile-check a query without executing it.
239302 */
@@ -290,7 +353,10 @@ private void streamResults(final Session wsSession, final String queryId,
290353 final DBBroker broker , final Sequence result ,
291354 final Properties outputProperties , final int chunkSize ,
292355 final EvalProtocol .Timing timing , final long startTime ,
293- final XQueryWatchDog watchDog ) {
356+ final XQueryWatchDog watchDog ,
357+ final AtomicBoolean terminalResponseSent ,
358+ final EvalSession evalSession ,
359+ final EvalProtocol .ClientMessage msg ) {
294360 final long serStart = System .currentTimeMillis ();
295361 final long totalItems = result .getItemCount ();
296362 int chunkNum = 0 ;
@@ -299,10 +365,11 @@ private void streamResults(final Session wsSession, final String queryId,
299365 try {
300366 final SequenceIterator iter = result .iterate ();
301367 while (iter .hasNext ()) {
302- if (watchDog .isTerminating ()) {
368+ if (watchDog .isTerminating () || terminalResponseSent . get () ) {
303369 timing .serialize = System .currentTimeMillis () - serStart ;
304370 timing .total = System .currentTimeMillis () - startTime ;
305- sendCancelled (wsSession , queryId , itemsSent , timing );
371+ sendCancelledIfAbsent (wsSession , queryId , evalSession , msg , timing ,
372+ itemsSent , terminalResponseSent );
306373 return ;
307374 }
308375
@@ -321,11 +388,16 @@ private void streamResults(final Session wsSession, final String queryId,
321388 if (!more ) {
322389 timing .serialize = System .currentTimeMillis () - serStart ;
323390 timing .total = System .currentTimeMillis () - startTime ;
391+ if (terminalResponseSent .compareAndSet (false , true )) {
392+ sendResult (wsSession , queryId , chunkNum , data , false , timing , totalItems );
393+ QueryMonitorBroadcaster .broadcastEvent ("completed" , queryId ,
394+ evalSession .getSubject ().getName (), msg .query (),
395+ null , totalItems , timing .total );
396+ }
397+ } else {
398+ sendResult (wsSession , queryId , chunkNum , data , true , null , totalItems );
324399 }
325400
326- sendResult (wsSession , queryId , chunkNum , data , more ,
327- more ? null : timing , totalItems );
328-
329401 // send progress during streaming
330402 if (more && (System .currentTimeMillis () - startTime ) % PROGRESS_INTERVAL_MS < 50 ) {
331403 sendProgress (wsSession , queryId , EvalProtocol .PHASE_SERIALIZING ,
@@ -335,7 +407,8 @@ private void streamResults(final Session wsSession, final String queryId,
335407 } catch (final XPathException | SAXException e ) {
336408 timing .serialize = System .currentTimeMillis () - serStart ;
337409 timing .total = System .currentTimeMillis () - startTime ;
338- sendError (wsSession , queryId , null , e .getMessage (), 0 , 0 , timing );
410+ reportError (wsSession , evalSession , msg , timing , startTime , ErrorInfo .of (e .getMessage ()),
411+ terminalResponseSent );
339412 }
340413 }
341414
0 commit comments