|
76 | 76 | import java.util.Optional;
|
77 | 77 | import java.util.Set;
|
78 | 78 | import java.util.concurrent.ThreadLocalRandom;
|
| 79 | +import java.util.concurrent.TimeoutException; |
79 | 80 | import java.util.concurrent.atomic.AtomicReference;
|
80 | 81 | import java.util.concurrent.locks.Lock;
|
81 | 82 | import java.util.concurrent.locks.ReadWriteLock;
|
@@ -363,9 +364,12 @@ private DistributedQueryRunner(
|
363 | 364 | prestoClients = prestoClientsBuilder.build();
|
364 | 365 |
|
365 | 366 | long start = nanoTime();
|
366 |
| - while (!allNodesGloballyVisible()) { |
367 |
| - Assertions.assertLessThan(nanosSince(start), new Duration(100, SECONDS)); |
368 |
| - MILLISECONDS.sleep(10); |
| 367 | + try { |
| 368 | + waitForAllNodesGloballyVisible(start); |
| 369 | + } |
| 370 | + catch (TimeoutException e) { |
| 371 | + closer.close(); |
| 372 | + throw e; |
369 | 373 | }
|
370 | 374 | log.info("Announced servers in %s", nanosSince(start).convertToMostSuccinctTimeUnit());
|
371 | 375 |
|
@@ -517,22 +521,44 @@ else if (coordinatorSidecar) {
|
517 | 521 | return server;
|
518 | 522 | }
|
519 | 523 |
|
520 |
| - private boolean allNodesGloballyVisible() |
| 524 | + private void waitForAllNodesGloballyVisible(long startTimeInMs) |
| 525 | + throws Exception |
521 | 526 | {
|
522 |
| - int expectedActiveNodesForRm = externalWorkers.size() + servers.size(); |
523 |
| - int expectedActiveNodesForCoordinator = externalWorkers.size() + servers.size(); |
| 527 | + int expectedActiveNodes = externalWorkers.size() + servers.size(); |
| 528 | + Duration timeout = new Duration(100, SECONDS); |
524 | 529 |
|
525 |
| - for (TestingPrestoServer server : servers) { |
| 530 | + for (int serverIndex = 0; serverIndex < servers.size(); ) { |
| 531 | + TestingPrestoServer server = servers.get(serverIndex); |
526 | 532 | AllNodes allNodes = server.refreshNodes();
|
527 | 533 | int activeNodeCount = allNodes.getActiveNodes().size();
|
528 | 534 |
|
529 |
| - if (!allNodes.getInactiveNodes().isEmpty() || |
530 |
| - (server.isCoordinator() && activeNodeCount != expectedActiveNodesForCoordinator) || |
531 |
| - (server.isResourceManager() && activeNodeCount != expectedActiveNodesForRm)) { |
532 |
| - return false; |
| 535 | + if (!allNodes.getInactiveNodes().isEmpty()) { |
| 536 | + throwTimeoutIfNotReady( |
| 537 | + startTimeInMs, |
| 538 | + timeout, |
| 539 | + format("Timed out waiting for all nodes to be globally visible. Inactive nodes: %s", allNodes.getInactiveNodes())); |
| 540 | + MILLISECONDS.sleep(10); |
| 541 | + } |
| 542 | + else if ((server.isCoordinator() || server.isResourceManager()) && activeNodeCount != expectedActiveNodes) { |
| 543 | + throwTimeoutIfNotReady( |
| 544 | + startTimeInMs, |
| 545 | + timeout, |
| 546 | + format("Timed out waiting for all nodes to be globally visible. Node count: %s, expected: %s", activeNodeCount, expectedActiveNodes)); |
| 547 | + MILLISECONDS.sleep(10); |
| 548 | + } |
| 549 | + else { |
| 550 | + log.info("Server %s has %s active nodes", server.getBaseUrl(), activeNodeCount); |
| 551 | + serverIndex++; |
533 | 552 | }
|
534 | 553 | }
|
535 |
| - return true; |
| 554 | + } |
| 555 | + |
| 556 | + private static void throwTimeoutIfNotReady(long startTimeInMs, Duration timeout, String message) |
| 557 | + throws TimeoutException |
| 558 | + { |
| 559 | + if (nanosSince(startTimeInMs).compareTo(timeout) >= 0) { |
| 560 | + throw new TimeoutException(format(message)); |
| 561 | + } |
536 | 562 | }
|
537 | 563 |
|
538 | 564 | public TestingPrestoClient getRandomClient()
|
|
0 commit comments