|
1 | 1 | /* |
2 | | - * Copyright (c) 2005, 2023, Oracle and/or its affiliates. All rights reserved. |
| 2 | + * Copyright (c) 2005, 2025, Oracle and/or its affiliates. All rights reserved. |
3 | 3 | * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 | 4 | * |
5 | 5 | * This code is free software; you can redistribute it and/or modify it |
|
61 | 61 | import java.util.Set; |
62 | 62 | import java.util.Timer; |
63 | 63 | import java.util.TimerTask; |
| 64 | +import java.util.concurrent.CountDownLatch; |
64 | 65 | import java.util.concurrent.Executor; |
| 66 | +import java.util.concurrent.TimeUnit; |
65 | 67 |
|
66 | 68 | import static java.nio.charset.StandardCharsets.ISO_8859_1; |
67 | 69 | import static sun.net.httpserver.Utils.isValidName; |
@@ -94,7 +96,7 @@ class ServerImpl { |
94 | 96 | private final Set<HttpConnection> rspConnections; |
95 | 97 | private List<Event> events; |
96 | 98 | private final Object lolock = new Object(); |
97 | | - private volatile boolean finished = false; |
| 99 | + private final CountDownLatch finishedLatch = new CountDownLatch(1); |
98 | 100 | private volatile boolean terminating = false; |
99 | 101 | private boolean bound = false; |
100 | 102 | private boolean started = false; |
@@ -180,7 +182,7 @@ public void bind (InetSocketAddress addr, int backlog) throws IOException { |
180 | 182 | } |
181 | 183 |
|
182 | 184 | public void start () { |
183 | | - if (!bound || started || finished) { |
| 185 | + if (!bound || started || finished()) { |
184 | 186 | throw new IllegalStateException ("server in wrong state"); |
185 | 187 | } |
186 | 188 | if (executor == null) { |
@@ -223,45 +225,75 @@ public HttpsConfigurator getHttpsConfigurator () { |
223 | 225 | return httpsConfig; |
224 | 226 | } |
225 | 227 |
|
| 228 | + private final boolean finished(){ |
| 229 | + // if the latch is 0, the server is finished |
| 230 | + return finishedLatch.getCount() == 0; |
| 231 | + } |
| 232 | + |
226 | 233 | public final boolean isFinishing() { |
227 | | - return finished; |
| 234 | + return finished(); |
228 | 235 | } |
229 | 236 |
|
| 237 | + /** |
| 238 | + * This method stops the server by adding a stop request event and |
| 239 | + * waiting for the server until the event is triggered or until the maximum delay is triggered. |
| 240 | + * <p> |
| 241 | + * This ensures that the server is stopped immediately after all exchanges are complete. HttpConnections will be forcefully closed if active exchanges do not |
| 242 | + * complete within the imparted delay. |
| 243 | + * |
| 244 | + * @param delay maximum delay to wait for exchanges completion, in seconds |
| 245 | + */ |
230 | 246 | public void stop (int delay) { |
231 | 247 | if (delay < 0) { |
232 | 248 | throw new IllegalArgumentException ("negative delay parameter"); |
233 | 249 | } |
| 250 | + |
| 251 | + logger.log(Level.TRACE, "stopping"); |
| 252 | + // posting a stop event, which will flip finished flag if it finishes |
| 253 | + // before the timeout in this method |
234 | 254 | terminating = true; |
| 255 | + |
| 256 | + addEvent(new Event.StopRequested()); |
| 257 | + |
235 | 258 | try { schan.close(); } catch (IOException e) {} |
236 | 259 | selector.wakeup(); |
237 | | - long latest = System.currentTimeMillis() + delay * 1000; |
238 | | - while (System.currentTimeMillis() < latest) { |
239 | | - delay(); |
240 | | - if (finished) { |
241 | | - break; |
| 260 | + |
| 261 | + try { |
| 262 | + // waiting for the duration of the delay, unless released before |
| 263 | + finishedLatch.await(delay, TimeUnit.SECONDS); |
| 264 | + |
| 265 | + } catch (InterruptedException e) { |
| 266 | + logger.log(Level.TRACE, "Error in awaiting the delay"); |
| 267 | + |
| 268 | + } finally { |
| 269 | + |
| 270 | + logger.log(Level.TRACE, "closing connections"); |
| 271 | + finishedLatch.countDown(); |
| 272 | + selector.wakeup(); |
| 273 | + synchronized (allConnections) { |
| 274 | + for (HttpConnection c : allConnections) { |
| 275 | + c.close(); |
| 276 | + } |
242 | 277 | } |
243 | | - } |
244 | | - finished = true; |
245 | | - selector.wakeup(); |
246 | | - synchronized (allConnections) { |
247 | | - for (HttpConnection c : allConnections) { |
248 | | - c.close(); |
| 278 | + allConnections.clear(); |
| 279 | + idleConnections.clear(); |
| 280 | + newlyAcceptedConnections.clear(); |
| 281 | + timer.cancel(); |
| 282 | + if (reqRspTimeoutEnabled) { |
| 283 | + timer1.cancel(); |
249 | 284 | } |
250 | | - } |
251 | | - allConnections.clear(); |
252 | | - idleConnections.clear(); |
253 | | - newlyAcceptedConnections.clear(); |
254 | | - timer.cancel(); |
255 | | - if (reqRspTimeoutEnabled) { |
256 | | - timer1.cancel(); |
257 | | - } |
258 | | - if (dispatcherThread != null && dispatcherThread != Thread.currentThread()) { |
259 | | - try { |
260 | | - dispatcherThread.join(); |
261 | | - } catch (InterruptedException e) { |
262 | | - Thread.currentThread().interrupt(); |
263 | | - logger.log (Level.TRACE, "ServerImpl.stop: ", e); |
| 285 | + logger.log(Level.TRACE, "connections closed"); |
| 286 | + |
| 287 | + if (dispatcherThread != null && dispatcherThread != Thread.currentThread()) { |
| 288 | + logger.log(Level.TRACE, "waiting for dispatcher thread"); |
| 289 | + try { |
| 290 | + dispatcherThread.join(); |
| 291 | + } catch (InterruptedException e) { |
| 292 | + Thread.currentThread().interrupt(); |
| 293 | + logger.log(Level.TRACE, "ServerImpl.stop: ", e); |
| 294 | + } |
264 | 295 | } |
| 296 | + logger.log(Level.TRACE, "server stopped"); |
265 | 297 | } |
266 | 298 | } |
267 | 299 |
|
@@ -391,15 +423,34 @@ void addEvent (Event r) { |
391 | 423 | class Dispatcher implements Runnable { |
392 | 424 |
|
393 | 425 | private void handleEvent (Event r) { |
| 426 | + |
| 427 | + // Stopping marking the state as finished if stop is requested, |
| 428 | + // termination is in progress and exchange count is 0 |
| 429 | + if (r instanceof Event.StopRequested) { |
| 430 | + logger.log(Level.TRACE, "Handling Stop Requested Event"); |
| 431 | + |
| 432 | + // checking if terminating is set to true |
| 433 | + final boolean terminatingCopy = terminating; |
| 434 | + assert terminatingCopy; |
| 435 | + |
| 436 | + if (getExchangeCount() == 0 && reqConnections.isEmpty()) { |
| 437 | + finishedLatch.countDown(); |
| 438 | + } else { |
| 439 | + logger.log(Level.TRACE, "Some requests are still pending"); |
| 440 | + } |
| 441 | + return; |
| 442 | + } |
| 443 | + |
394 | 444 | ExchangeImpl t = r.exchange; |
395 | 445 | HttpConnection c = t.getConnection(); |
| 446 | + |
396 | 447 | try { |
397 | | - if (r instanceof WriteFinishedEvent) { |
| 448 | + if (r instanceof Event.WriteFinished) { |
398 | 449 |
|
399 | 450 | logger.log(Level.TRACE, "Write Finished"); |
400 | 451 | int exchanges = endExchange(); |
401 | | - if (terminating && exchanges == 0) { |
402 | | - finished = true; |
| 452 | + if (terminating && exchanges == 0 && reqConnections.isEmpty()) { |
| 453 | + finishedLatch.countDown(); |
403 | 454 | } |
404 | 455 | LeftOverInputStream is = t.getOriginalInputStream(); |
405 | 456 | if (!is.isEOF()) { |
@@ -449,11 +500,12 @@ void reRegister (HttpConnection c) { |
449 | 500 | } |
450 | 501 |
|
451 | 502 | public void run() { |
452 | | - while (!finished) { |
| 503 | + // finished() will be true when there are no active exchange after terminating |
| 504 | + while (!finished()) { |
453 | 505 | try { |
454 | 506 | List<Event> list = null; |
455 | 507 | synchronized (lolock) { |
456 | | - if (events.size() > 0) { |
| 508 | + if (!events.isEmpty()) { |
457 | 509 | list = events; |
458 | 510 | events = new ArrayList<>(); |
459 | 511 | } |
@@ -600,18 +652,18 @@ private void closeConnection(HttpConnection conn) { |
600 | 652 | conn.close(); |
601 | 653 | allConnections.remove(conn); |
602 | 654 | switch (conn.getState()) { |
603 | | - case REQUEST: |
604 | | - reqConnections.remove(conn); |
605 | | - break; |
606 | | - case RESPONSE: |
607 | | - rspConnections.remove(conn); |
608 | | - break; |
609 | | - case IDLE: |
610 | | - idleConnections.remove(conn); |
611 | | - break; |
612 | | - case NEWLY_ACCEPTED: |
613 | | - newlyAcceptedConnections.remove(conn); |
614 | | - break; |
| 655 | + case REQUEST: |
| 656 | + reqConnections.remove(conn); |
| 657 | + break; |
| 658 | + case RESPONSE: |
| 659 | + rspConnections.remove(conn); |
| 660 | + break; |
| 661 | + case IDLE: |
| 662 | + idleConnections.remove(conn); |
| 663 | + break; |
| 664 | + case NEWLY_ACCEPTED: |
| 665 | + newlyAcceptedConnections.remove(conn); |
| 666 | + break; |
615 | 667 | } |
616 | 668 | assert !reqConnections.remove(conn); |
617 | 669 | assert !rspConnections.remove(conn); |
@@ -933,19 +985,16 @@ void logReply (int code, String requestStr, String text) { |
933 | 985 | logger.log (Level.DEBUG, message); |
934 | 986 | } |
935 | 987 |
|
936 | | - void delay () { |
937 | | - Thread.yield(); |
938 | | - try { |
939 | | - Thread.sleep (200); |
940 | | - } catch (InterruptedException e) {} |
941 | | - } |
942 | | - |
943 | 988 | private int exchangeCount = 0; |
944 | 989 |
|
945 | 990 | synchronized void startExchange () { |
946 | 991 | exchangeCount ++; |
947 | 992 | } |
948 | 993 |
|
| 994 | + synchronized int getExchangeCount() { |
| 995 | + return exchangeCount; |
| 996 | + } |
| 997 | + |
949 | 998 | synchronized int endExchange () { |
950 | 999 | exchangeCount --; |
951 | 1000 | assert exchangeCount >= 0; |
|
0 commit comments