|
76 | 76 | import java.util.Optional;
|
77 | 77 | import java.util.Set;
|
78 | 78 | import java.util.concurrent.ThreadLocalRandom;
|
| 79 | +import java.util.concurrent.TimeoutException; |
79 | 80 | import java.util.concurrent.atomic.AtomicReference;
|
80 | 81 | import java.util.concurrent.locks.Lock;
|
81 | 82 | import java.util.concurrent.locks.ReadWriteLock;
|
@@ -363,9 +364,12 @@ private DistributedQueryRunner(
|
363 | 364 | prestoClients = prestoClientsBuilder.build();
|
364 | 365 |
|
365 | 366 | long start = nanoTime();
|
366 |
| - while (!allNodesGloballyVisible()) { |
367 |
| - Assertions.assertLessThan(nanosSince(start), new Duration(100, SECONDS)); |
368 |
| - MILLISECONDS.sleep(10); |
| 367 | + try { |
| 368 | + waitForAllNodesGloballyVisible(start); |
| 369 | + } |
| 370 | + catch (TimeoutException e) { |
| 371 | + closer.close(); |
| 372 | + throw e; |
369 | 373 | }
|
370 | 374 | log.info("Announced servers in %s", nanosSince(start).convertToMostSuccinctTimeUnit());
|
371 | 375 |
|
@@ -517,22 +521,35 @@ else if (coordinatorSidecar) {
|
517 | 521 | return server;
|
518 | 522 | }
|
519 | 523 |
|
520 |
| - private boolean allNodesGloballyVisible() |
| 524 | + private void waitForAllNodesGloballyVisible(long startTimeInMs) |
| 525 | + throws Exception |
521 | 526 | {
|
522 |
| - int expectedActiveNodesForRm = externalWorkers.size() + servers.size(); |
523 |
| - int expectedActiveNodesForCoordinator = externalWorkers.size() + servers.size(); |
| 527 | + int expectedActiveNodes = externalWorkers.size() + servers.size(); |
| 528 | + Duration timeout = new Duration(100, SECONDS); |
524 | 529 |
|
525 |
| - for (TestingPrestoServer server : servers) { |
526 |
| - AllNodes allNodes = server.refreshNodes(); |
527 |
| - int activeNodeCount = allNodes.getActiveNodes().size(); |
| 530 | + while (true) { |
| 531 | + for (TestingPrestoServer server : servers) { |
| 532 | + AllNodes allNodes = server.refreshNodes(); |
| 533 | + int activeNodeCount = allNodes.getActiveNodes().size(); |
528 | 534 |
|
529 |
| - if (!allNodes.getInactiveNodes().isEmpty() || |
530 |
| - (server.isCoordinator() && activeNodeCount != expectedActiveNodesForCoordinator) || |
531 |
| - (server.isResourceManager() && activeNodeCount != expectedActiveNodesForRm)) { |
532 |
| - return false; |
| 535 | + if (!allNodes.getInactiveNodes().isEmpty()) { |
| 536 | + if (nanosSince(startTimeInMs).compareTo(timeout) >= 0) { |
| 537 | + throw new TimeoutException(format("Timed out waiting for all nodes to be globally visible. Inactive nodes: %s", allNodes.getInactiveNodes())); |
| 538 | + } |
| 539 | + break; |
| 540 | + } |
| 541 | + else if ((server.isCoordinator() || server.isResourceManager()) && activeNodeCount != expectedActiveNodes) { |
| 542 | + if (nanosSince(startTimeInMs).compareTo(timeout) >= 0) { |
| 543 | + throw new TimeoutException(format( |
| 544 | + "Timed out waiting for all nodes to be globally visible. Node count: %s, expected: %s", |
| 545 | + activeNodeCount, expectedActiveNodes)); |
| 546 | + } |
| 547 | + break; |
| 548 | + } |
| 549 | + return; |
533 | 550 | }
|
| 551 | + MILLISECONDS.sleep(10); |
534 | 552 | }
|
535 |
| - return true; |
536 | 553 | }
|
537 | 554 |
|
538 | 555 | public TestingPrestoClient getRandomClient()
|
|
0 commit comments