Skip to content

Commit fb81e74

Browse files
authored
Fix another race in getWorkerFor. (#18918)
This patch is covering another case, beyond the one covered in #18910. There was still a race when multiple workers were running and all trying to contact each other, if worker A tried to contact worker B before worker B had been set up. The fix is to have newWorker return null rather than throw an error in the base class, and update getWorkerFor to retry on null.
1 parent f204289 commit fb81e74

1 file changed

Lines changed: 7 additions & 4 deletions

File tree

multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,13 @@ public ListenableFuture<Void> postWorkOrder(String workerTaskId, WorkOrder workO
6161

6262
protected Worker getWorkerFor(String workerTaskId)
6363
{
64-
final WorkerRunRef workerRunRef = inMemoryWorkers.computeIfAbsent(workerTaskId, this::newWorker);
6564
final Stopwatch stopwatch = Stopwatch.createStarted();
6665

67-
// Wait for the worker to exist
68-
while (!workerRunRef.hasWorker()) {
66+
// Wait for the worker to exist. It may not have been created or started up yet, especially if this is
67+
// a worker trying to contact another worker.
68+
WorkerRunRef workerRunRef;
69+
while ((workerRunRef = inMemoryWorkers.computeIfAbsent(workerTaskId, this::newWorker)) == null
70+
|| !workerRunRef.hasWorker()) {
6971
if (stopwatch.millisElapsed() > WORKER_WAIT_TIMEOUT_MS) {
7072
throw new ISE(
7173
"Timed out after [%,d]ms waiting for worker[%s] to be registered",
@@ -88,7 +90,8 @@ protected Worker getWorkerFor(String workerTaskId)
8890

8991
protected WorkerRunRef newWorker(String workerId)
9092
{
91-
throw new RuntimeException("Not implemented!");
93+
// Return null so getWorkerFor waits for inMemoryWorkers to be populated
94+
return null;
9295
}
9396

9497
@Override

0 commit comments

Comments
 (0)