From 6b8d82d7dbc8e8822b94929b7dfe5b5b05e2f988 Mon Sep 17 00:00:00 2001 From: Trevor Clinkenbeard Date: Tue, 17 Mar 2026 21:01:23 +0000 Subject: [PATCH 1/7] Convert TaskBucket.actor.cpp to standard coroutines --- fdbclient/TaskBucket.actor.cpp | 586 +++++++++++++++++---------------- 1 file changed, 296 insertions(+), 290 deletions(-) diff --git a/fdbclient/TaskBucket.actor.cpp b/fdbclient/TaskBucket.actor.cpp index d3d619c84b8..b65f081ed05 100644 --- a/fdbclient/TaskBucket.actor.cpp +++ b/fdbclient/TaskBucket.actor.cpp @@ -44,22 +44,22 @@ struct UnblockFutureTaskFunc : TaskFuncBase { return _finish(tr, tb, fb, task); }; - ACTOR static Future _finish(Reference tr, - Reference taskBucket, - Reference futureBucket, - Reference task) { - state Reference future = futureBucket->unpack(task->params[Task::reservedTaskParamKeyFuture]); + static Future _finish(Reference tr, + Reference taskBucket, + Reference futureBucket, + Reference task) { + Reference future = futureBucket->unpack(task->params[Task::reservedTaskParamKeyFuture]); futureBucket->setOptions(tr); tr->clear(future->blocks.pack(task->params[Task::reservedTaskParamKeyBlockID])); - bool is_set = wait(future->isSet(tr)); + bool is_set = co_await future->isSet(tr); if (is_set) { - wait(future->performAllActions(tr, taskBucket)); + co_await future->performAllActions(tr, taskBucket); } - return Void(); + co_return; } }; StringRef UnblockFutureTaskFunc::name = "UnblockFuture"_sr; @@ -159,36 +159,35 @@ unsigned int Task::getPriority() const { class TaskBucketImpl { public: - ACTOR static Future> getTaskKey(Reference tr, - Reference taskBucket, - int priority = 0) { + static Future> getTaskKey(Reference tr, + Reference taskBucket, + int priority = 0) { Standalone uid = StringRef(deterministicRandom()->randomUniqueID().toString()); // Get keyspace for the specified priority level - state Subspace space = taskBucket->getAvailableSpace(priority); + Subspace space = taskBucket->getAvailableSpace(priority); { // Get a task key that is <= a random UID task key, if successful then return it RangeResult value = - wait(tr->getRange(KeyRangeRef(space.key(), space.pack(uid)), 1, Snapshot::True, Reverse::True)); + co_await tr->getRange(KeyRangeRef(space.key(), space.pack(uid)), 1, Snapshot::True, Reverse::True); if (!value.empty()) { - return Optional(value[0].key); + co_return Optional(value[0].key); } } { // Get a task key that is <= the maximum possible UID, if successful return it. - RangeResult value = - wait(tr->getRange(KeyRangeRef(space.key(), space.pack(maxUIDKey)), 1, Snapshot::True, Reverse::True)); + RangeResult value = co_await tr->getRange( + KeyRangeRef(space.key(), space.pack(maxUIDKey)), 1, Snapshot::True, Reverse::True); if (!value.empty()) { - return Optional(value[0].key); + co_return Optional(value[0].key); } } - return Optional(); + co_return Optional(); } - ACTOR static Future> getOne(Reference tr, - Reference taskBucket) { + static Future> getOne(Reference tr, Reference taskBucket) { if (taskBucket->priority_batch) tr->setOption(FDBTransactionOptions::PRIORITY_BATCH); @@ -197,20 +196,20 @@ class TaskBucketImpl { // give it some chances for the timed out tasks to get into the task loop in the case of // many other new tasks get added so that the timed out tasks never get chances to re-run if (deterministicRandom()->random01() < CLIENT_KNOBS->TASKBUCKET_CHECK_TIMEOUT_CHANCE) { - bool anyTimeouts = wait(requeueTimedOutTasks(tr, taskBucket)); + bool anyTimeouts = co_await requeueTimedOutTasks(tr, taskBucket); CODE_PROBE(anyTimeouts, "Found a task that timed out"); } - state std::vector>> taskKeyFutures(CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY + 1); + std::vector>> taskKeyFutures(CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY + 1); // Start looking for a task at each priority, highest first - state int pri; + int pri{ 0 }; for (pri = CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY; pri >= 0; --pri) taskKeyFutures[pri] = getTaskKey(tr, taskBucket, pri); // Task key and subspace it is located in. - state Optional taskKey; - state Subspace availableSpace; + Optional taskKey; + Subspace availableSpace; // In priority order from highest to lowest, wait for fetch to finish and if it found a task then cancel the // rest. @@ -219,7 +218,7 @@ class TaskBucketImpl { if (taskKey.present()) taskKeyFutures[pri].cancel(); else { - Optional key = wait(taskKeyFutures[pri]); + Optional key = co_await taskKeyFutures[pri]; if (key.present()) { taskKey = key; availableSpace = taskBucket->getAvailableSpace(pri); @@ -229,26 +228,26 @@ class TaskBucketImpl { // If we don't have a task key, requeue timed out tasks and try again by calling self. if (!taskKey.present()) { - bool anyTimeouts = wait(requeueTimedOutTasks(tr, taskBucket)); + bool anyTimeouts = co_await requeueTimedOutTasks(tr, taskBucket); // If there were timeouts, try to get a task since there should now be one in one of the available spaces. if (anyTimeouts) { CODE_PROBE(true, "Try to get one task from timeouts subspace"); - Reference task = wait(getOne(tr, taskBucket)); - return task; + Reference task = co_await getOne(tr, taskBucket); + co_return task; } - return Reference(); + co_return Reference(); } // Now we know the task key is present and we have the available space for the task's priority - state Tuple t = availableSpace.unpack(taskKey.get()); - state Key taskUID = t.getString(0); - state Subspace taskAvailableSpace = availableSpace.get(taskUID); + Tuple t = availableSpace.unpack(taskKey.get()); + Key taskUID = t.getString(0); + Subspace taskAvailableSpace = availableSpace.get(taskUID); - state Reference task(new Task()); + Reference task(new Task()); task->key = taskUID; - state RangeResult values = wait(tr->getRange(taskAvailableSpace.range(), CLIENT_KNOBS->TOO_MANY)); - Version version = wait(tr->getReadVersion()); + RangeResult values = co_await tr->getRange(taskAvailableSpace.range(), CLIENT_KNOBS->TOO_MANY); + Version version = co_await tr->getReadVersion(); task->timeoutVersion = version + (uint64_t)(taskBucket->timeout * (CLIENT_KNOBS->TASKBUCKET_TIMEOUT_JITTER_OFFSET + @@ -266,19 +265,19 @@ class TaskBucketImpl { tr->clear(taskAvailableSpace.range()); tr->set(taskBucket->active.key(), deterministicRandom()->randomUniqueID().toString()); - return task; + co_return task; } // Verify that the user configured task verification key still has the user specified value - ACTOR static Future taskVerify(Reference tb, - Reference tr, - Reference task) { + static Future taskVerify(Reference tb, + Reference tr, + Reference task) { if (task->params.find(Task::reservedTaskParamValidKey) == task->params.end()) { TraceEvent("TaskBucketTaskVerifyInvalidTask") .detail("Task", task->params[Task::reservedTaskParamKeyType]) .detail("ReservedTaskParamValidKey", "missing"); - return false; + co_return false; } if (task->params.find(Task::reservedTaskParamValidValue) == task->params.end()) { @@ -286,12 +285,12 @@ class TaskBucketImpl { .detail("Task", task->params[Task::reservedTaskParamKeyType]) .detail("ReservedTaskParamValidKey", task->params[Task::reservedTaskParamValidKey]) .detail("ReservedTaskParamValidValue", "missing"); - return false; + co_return false; } tb->setOptions(tr); - Optional keyValue = wait(tr->get(task->params[Task::reservedTaskParamValidKey])); + Optional keyValue = co_await tr->get(task->params[Task::reservedTaskParamValidKey]); if (!keyValue.present()) { TraceEvent("TaskBucketTaskVerifyInvalidTask") @@ -299,7 +298,7 @@ class TaskBucketImpl { .detail("ReservedTaskParamValidKey", task->params[Task::reservedTaskParamValidKey]) .detail("ReservedTaskParamValidValue", task->params[Task::reservedTaskParamValidValue]) .detail("KeyValue", "missing"); - return false; + co_return false; } if (keyValue.get().compare(StringRef(task->params[Task::reservedTaskParamValidValue]))) { @@ -308,74 +307,72 @@ class TaskBucketImpl { .detail("ReservedTaskParamValidKey", task->params[Task::reservedTaskParamValidKey]) .detail("ReservedTaskParamValidValue", task->params[Task::reservedTaskParamValidValue]) .detail("KeyValue", keyValue.get()); - return false; + co_return false; } - return true; + co_return true; } - ACTOR static Future taskVerify(Reference tb, Database cx, Reference task) { + static Future taskVerify(Reference tb, Database cx, Reference task) { loop { - state Reference tr(new ReadYourWritesTransaction(cx)); - + Reference tr(new ReadYourWritesTransaction(cx)); + Error err; try { - bool verified = wait(taskVerify(tb, tr, task)); - return verified; + bool verified = co_await taskVerify(tb, tr, task); + co_return verified; } catch (Error& e) { - wait(tr->onError(e)); + err = e; } + co_await tr->onError(err); } } - ACTOR static Future finishTaskRun(Reference tr, - Reference taskBucket, - Reference futureBucket, - Reference task, - Reference taskFunc, - VerifyTask verifyTask) { - bool isFinished = wait(taskBucket->isFinished(tr, task)); + static Future finishTaskRun(Reference tr, + Reference taskBucket, + Reference futureBucket, + Reference task, + Reference taskFunc, + VerifyTask verifyTask) { + bool isFinished = co_await taskBucket->isFinished(tr, task); if (isFinished) { - return Void(); + co_return; } - state bool validTask = true; + bool validTask = true; if (verifyTask) { - bool _validTask = wait(taskVerify(taskBucket, tr, task)); + bool _validTask = co_await taskVerify(taskBucket, tr, task); validTask = _validTask; } if (!validTask) { - wait(taskBucket->finish(tr, task)); + co_await taskBucket->finish(tr, task); } else { - wait(taskFunc->finish(tr, taskBucket, futureBucket, task)); + co_await taskFunc->finish(tr, taskBucket, futureBucket, task); } - return Void(); + co_return; } - ACTOR static Future doOne(Database cx, - Reference taskBucket, - Reference futureBucket) { - state Reference task = wait(taskBucket->getOne(cx)); - bool result = wait(taskBucket->doTask(cx, futureBucket, task)); - return result; + static Future doOne(Database cx, Reference taskBucket, Reference futureBucket) { + Reference task = co_await taskBucket->getOne(cx); + bool result = co_await taskBucket->doTask(cx, futureBucket, task); + co_return result; } - ACTOR static Future extendTimeoutRepeatedly(Database cx, - Reference taskBucket, - Reference task) { - state Reference tr(new ReadYourWritesTransaction(cx)); - state double start = now(); - state Version versionNow = wait(runRYWTransaction(cx, [=](Reference tr) { + static Future extendTimeoutRepeatedly(Database cx, Reference taskBucket, Reference task) { + Reference tr(new ReadYourWritesTransaction(cx)); + double start = now(); + Version versionNow = co_await runRYWTransaction(cx, [=](Reference tr) { taskBucket->setOptions(tr); return map(tr->getReadVersion(), [=](Version v) { return v; }); - })); + }); loop { - state FlowLock::Releaser releaser; + FlowLock::Releaser releaser; // Wait until we are half way to the timeout version of this task - wait(delay(0.8 * (BUGGIFY ? (2 * deterministicRandom()->random01()) : 1.0) * - (double)(task->timeoutVersion - (uint64_t)versionNow) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND)); + co_await delay(0.8 * (BUGGIFY ? (2 * deterministicRandom()->random01()) : 1.0) * + (double)(task->timeoutVersion - (uint64_t)versionNow) / + CLIENT_KNOBS->CORE_VERSIONSPERSECOND); if (now() - start > 300) { TraceEvent(SevWarnAlways, "TaskBucketLongExtend") @@ -385,37 +382,40 @@ class TaskBucketImpl { .detail("Priority", task->getPriority()); } // Take the extendMutex lock until we either succeed or stop trying to extend due to failure - wait(task->extendMutex.take()); + co_await task->extendMutex.take(); releaser = FlowLock::Releaser(task->extendMutex, 1); loop { + Error err; try { tr->reset(); taskBucket->setOptions(tr); // Attempt to extend the task's timeout - state Version newTimeout = wait(taskBucket->extendTimeout(tr, task, UpdateParams::False)); - wait(tr->commit()); + Version newTimeout = co_await taskBucket->extendTimeout(tr, task, UpdateParams::False); + co_await tr->commit(); task->timeoutVersion = newTimeout; versionNow = tr->getCommittedVersion(); break; } catch (Error& e) { - wait(tr->onError(e)); + err = e; } + co_await tr->onError(err); } } } - ACTOR static Future doTask(Database cx, - Reference taskBucket, - Reference futureBucket, - Reference task) { - state Reference taskFunc; - state VerifyTask verifyTask(false); + static Future doTask(Database cx, + Reference taskBucket, + Reference futureBucket, + Reference task) { + Reference taskFunc; + VerifyTask verifyTask(false); if (!task || !TaskFuncBase::isValidTask(task)) - return false; + co_return false; + Optional err; try { taskFunc = TaskFuncBase::create(task->params[Task::reservedTaskParamKeyType]); if (taskFunc) { @@ -423,75 +423,78 @@ class TaskBucketImpl { if (verifyTask) { loop { - state Reference tr(new ReadYourWritesTransaction(cx)); + Reference tr(new ReadYourWritesTransaction(cx)); taskBucket->setOptions(tr); - + Error err; try { - bool validTask = wait(taskVerify(taskBucket, tr, task)); + bool validTask = co_await taskVerify(taskBucket, tr, task); if (!validTask) { - bool isFinished = wait(taskBucket->isFinished(tr, task)); + bool isFinished = co_await taskBucket->isFinished(tr, task); if (!isFinished) { - wait(taskBucket->finish(tr, task)); + co_await taskBucket->finish(tr, task); } - wait(tr->commit()); - return true; + co_await tr->commit(); + co_return true; } break; } catch (Error& e) { - wait(tr->onError(e)); + err = e; } + co_await tr->onError(err); } } - wait(taskFunc->execute(cx, taskBucket, futureBucket, task) || - extendTimeoutRepeatedly(cx, taskBucket, task)); + co_await (taskFunc->execute(cx, taskBucket, futureBucket, task) || + extendTimeoutRepeatedly(cx, taskBucket, task)); if (BUGGIFY) - wait(delay(10.0)); - wait(runRYWTransaction(cx, [=](Reference tr) { + co_await delay(10.0); + co_await runRYWTransaction(cx, [=](Reference tr) { return finishTaskRun(tr, taskBucket, futureBucket, task, taskFunc, verifyTask); - })); + }); } + co_return true; } catch (Error& e) { - TraceEvent(SevWarn, "TaskBucketExecuteFailure") - .error(e) - .detail("TaskUID", task->key) + err = e; + } + TraceEvent(SevWarn, "TaskBucketExecuteFailure") + .error(err.get()) + .detail("TaskUID", task->key) + .detail("TaskType", task->params[Task::reservedTaskParamKeyType].printable()) + .detail("Priority", task->getPriority()); + try { + co_await taskFunc->handleError(cx, task, err.get()); + } catch (Error& handleErr) { + TraceEvent(SevWarn, "TaskBucketExecuteFailureLogErrorFailed") + .error(handleErr) // output handleError() error instead of original task error + .detail("TaskUID", task->key.printable()) .detail("TaskType", task->params[Task::reservedTaskParamKeyType].printable()) .detail("Priority", task->getPriority()); - try { - wait(taskFunc->handleError(cx, task, e)); - } catch (Error& e) { - TraceEvent(SevWarn, "TaskBucketExecuteFailureLogErrorFailed") - .error(e) // output handleError() error instead of original task error - .detail("TaskUID", task->key.printable()) - .detail("TaskType", task->params[Task::reservedTaskParamKeyType].printable()) - .detail("Priority", task->getPriority()); - } } // Return true to indicate that we did work. - return true; + co_return true; } - ACTOR static Future dispatch(Database cx, - Reference taskBucket, - Reference futureBucket, - std::shared_ptr pollDelay, - int maxConcurrentTasks) { - state std::vector> tasks(maxConcurrentTasks); + static Future dispatch(Database cx, + Reference taskBucket, + Reference futureBucket, + std::shared_ptr pollDelay, + int maxConcurrentTasks) { + std::vector> tasks(maxConcurrentTasks); for (auto& f : tasks) f = Never(); // Since the futures have to be kept in a vector to be compatible with waitForAny(), we'll keep a queue // of available slots in it. Initially, they're all available. - state std::vector availableSlots; + std::vector availableSlots; availableSlots.reserve(tasks.size()); for (int i = 0; i < tasks.size(); ++i) availableSlots.push_back(i); - state std::vector>> getTasks; - state unsigned int getBatchSize = 1; + std::vector>> getTasks; + unsigned int getBatchSize = 1; loop { // Start running tasks while slots are available and we keep finding work to do @@ -500,7 +503,7 @@ class TaskBucketImpl { getTasks.clear(); for (int i = 0, imax = std::min(getBatchSize, availableSlots.size()); i < imax; ++i) getTasks.push_back(taskBucket->getOne(cx)); - wait(waitForAllReady(getTasks)); + co_await waitForAllReady(getTasks); bool done = false; for (int i = 0; i < getTasks.size(); ++i) { @@ -540,7 +543,7 @@ class TaskBucketImpl { w = w || delay(*pollDelay * (0.9 + deterministicRandom()->random01() / 5)); // Jittered by 20 %, so +/- 10% } - wait(w); + co_await w; // Check all of the task slots, any that are finished should be replaced with Never() and their slots added // back to availableSlots @@ -553,40 +556,40 @@ class TaskBucketImpl { } } - ACTOR static Future watchPaused(Database cx, - Reference taskBucket, - Reference> paused) { + static Future watchPaused(Database cx, Reference taskBucket, Reference> paused) { loop { - state Reference tr(new ReadYourWritesTransaction(cx)); + Reference tr(new ReadYourWritesTransaction(cx)); + Error err; try { taskBucket->setOptions(tr); - Optional pausedVal = wait(tr->get(taskBucket->pauseKey)); + Optional pausedVal = co_await tr->get(taskBucket->pauseKey); paused->set(pausedVal.present()); - state Future watchPausedFuture = tr->watch(taskBucket->pauseKey); - wait(tr->commit()); - wait(watchPausedFuture); + Future watchPausedFuture = tr->watch(taskBucket->pauseKey); + co_await tr->commit(); + co_await watchPausedFuture; } catch (Error& e) { - wait(tr->onError(e)); + err = e; } + co_await tr->onError(err); } } - ACTOR static Future run(Database cx, - Reference taskBucket, - Reference futureBucket, - std::shared_ptr pollDelay, - int maxConcurrentTasks) { - state Reference> paused = makeReference>(true); - state Future watchPausedFuture = watchPaused(cx, taskBucket, paused); + static Future run(Database cx, + Reference taskBucket, + Reference futureBucket, + std::shared_ptr pollDelay, + int maxConcurrentTasks) { + Reference> paused = makeReference>(true); + Future watchPausedFuture = watchPaused(cx, taskBucket, paused); taskBucket->metricLogger = taskBucket->cc.traceCounters( "TaskBucketMetrics", taskBucket->dbgid, CLIENT_KNOBS->TASKBUCKET_LOGGING_DELAY); loop { while (paused->get()) { - wait(paused->onChange() || watchPausedFuture); + co_await (paused->onChange() || watchPausedFuture); } - wait(dispatch(cx, taskBucket, futureBucket, pollDelay, maxConcurrentTasks) || paused->onChange() || - watchPausedFuture); + co_await (dispatch(cx, taskBucket, futureBucket, pollDelay, maxConcurrentTasks) || paused->onChange() || + watchPausedFuture); } } @@ -602,148 +605,151 @@ class TaskBucketImpl { return runRYWTransaction(cx, [=](Reference tr) { return addIdle(tr, taskBucket); }); } - ACTOR static Future isEmpty(Reference tr, Reference taskBucket) { + static Future isEmpty(Reference tr, Reference taskBucket) { taskBucket->setOptions(tr); // Check all available priorities for keys - state std::vector> resultFutures; + std::vector> resultFutures; for (int pri = 0; pri <= CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY; ++pri) resultFutures.push_back(tr->getRange(taskBucket->getAvailableSpace(pri).range(), 1)); // If any priority levels have any keys then the taskbucket is not empty so return false - state int i; + int i{ 0 }; for (i = 0; i < resultFutures.size(); ++i) { - RangeResult results = wait(resultFutures[i]); + RangeResult results = co_await resultFutures[i]; if (results.size() > 0) - return false; + co_return false; } - RangeResult values = wait(tr->getRange(taskBucket->timeouts.range(), 1)); + RangeResult values = co_await tr->getRange(taskBucket->timeouts.range(), 1); if (values.size() > 0) - return false; + co_return false; - return true; + co_return true; } - ACTOR static Future isBusy(Reference tr, Reference taskBucket) { + static Future isBusy(Reference tr, Reference taskBucket) { taskBucket->setOptions(tr); // Check all available priorities for emptiness - state std::vector> resultFutures; + std::vector> resultFutures; for (int pri = 0; pri <= CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY; ++pri) resultFutures.push_back(tr->getRange(taskBucket->getAvailableSpace(pri).range(), 1)); // If any priority levels have any keys then return true as the level is 'busy' - state int i; + int i{ 0 }; for (i = 0; i < resultFutures.size(); ++i) { - RangeResult results = wait(resultFutures[i]); + RangeResult results = co_await resultFutures[i]; if (results.size() > 0) - return true; + co_return true; } - return false; + co_return false; } // Verify that the task's keys are still in the timeout space at the expected timeout prefix - ACTOR static Future isFinished(Reference tr, - Reference taskBucket, - Reference task) { + static Future isFinished(Reference tr, + Reference taskBucket, + Reference task) { taskBucket->setOptions(tr); Tuple t = Tuple::makeTuple(task->timeoutVersion, task->key); - RangeResult values = wait(tr->getRange(taskBucket->timeouts.range(t), 1)); + RangeResult values = co_await tr->getRange(taskBucket->timeouts.range(t), 1); if (values.size() > 0) - return false; + co_return false; - return true; + co_return true; } - ACTOR static Future getActiveKey(Reference tr, - Reference taskBucket, - Optional startingValue) { + static Future getActiveKey(Reference tr, + Reference taskBucket, + Optional startingValue) { taskBucket->setOptions(tr); - Optional new_value = wait(tr->get(taskBucket->active.key())); + Optional new_value = co_await tr->get(taskBucket->active.key()); if (new_value != startingValue) { - return true; + co_return true; } - return false; + co_return false; } - ACTOR static Future checkActive(Database cx, Reference taskBucket) { - state Reference tr(new ReadYourWritesTransaction(cx)); - state Optional startingValue; + static Future checkActive(Database cx, Reference taskBucket) { + Reference tr(new ReadYourWritesTransaction(cx)); + Optional startingValue; loop { + Error err; try { taskBucket->setOptions(tr); - bool is_busy = wait(isBusy(tr, taskBucket)); + bool is_busy = co_await isBusy(tr, taskBucket); if (!is_busy) { - wait(success(addIdle(tr, taskBucket))); + co_await success(addIdle(tr, taskBucket)); } - Optional val = wait(tr->get(taskBucket->active.key())); + Optional val = co_await tr->get(taskBucket->active.key()); startingValue = val; - wait(tr->commit()); + co_await tr->commit(); break; } catch (Error& e) { - wait(tr->onError(e)); + err = e; } + co_await tr->onError(err); } - state int idx = 0; + int idx = 0; for (; idx < CLIENT_KNOBS->TASKBUCKET_CHECK_ACTIVE_AMOUNT; ++idx) { tr = makeReference(cx); loop { + Error err; try { taskBucket->setOptions(tr); - wait(delay(CLIENT_KNOBS->TASKBUCKET_CHECK_ACTIVE_DELAY)); - bool isActiveKey = wait(getActiveKey(tr, taskBucket, startingValue)); + co_await delay(CLIENT_KNOBS->TASKBUCKET_CHECK_ACTIVE_DELAY); + bool isActiveKey = co_await getActiveKey(tr, taskBucket, startingValue); if (isActiveKey) { CODE_PROBE(true, "checkActive return true"); - return true; + co_return true; } break; } catch (Error& e) { - wait(tr->onError(e)); + err = e; } + co_await tr->onError(err); } } CODE_PROBE(true, "checkActive return false"); - return false; + co_return false; } - ACTOR static Future getTaskCount(Reference tr, - Reference taskBucket) { + static Future getTaskCount(Reference tr, Reference taskBucket) { taskBucket->setOptions(tr); - Optional val = wait(tr->get(taskBucket->prefix.pack("task_count"_sr))); + Optional val = co_await tr->get(taskBucket->prefix.pack("task_count"_sr)); if (!val.present()) - return 0; + co_return 0; ASSERT(val.get().size() == sizeof(int64_t)); int64_t intValue = 0; memcpy(&intValue, val.get().begin(), val.get().size()); - return intValue; + co_return intValue; } // Looks for tasks that have timed out and returns them to be available tasks. // Returns True if any tasks were affected. - ACTOR static Future requeueTimedOutTasks(Reference tr, - Reference taskBucket) { + static Future requeueTimedOutTasks(Reference tr, + Reference taskBucket) { CODE_PROBE(true, "Looks for tasks that have timed out and returns them to be available tasks."); - Version end = wait(tr->getReadVersion()); - state KeyRange range( + Version end = co_await tr->getReadVersion(); + KeyRange range( KeyRangeRef(taskBucket->timeouts.get(0).range().begin, taskBucket->timeouts.get(end).range().end)); - RangeResult values = wait(tr->getRange(range, CLIENT_KNOBS->TASKBUCKET_MAX_TASK_KEYS)); + RangeResult values = co_await tr->getRange(range, CLIENT_KNOBS->TASKBUCKET_MAX_TASK_KEYS); // Keys will be tuples of (taskUID, param) -> paramValue // Unfortunately we need to know the priority parameter for a taskUID before we can know which available-tasks @@ -781,20 +787,20 @@ class TaskBucketImpl { if (values.size() > 0) { tr->clear(range); - return true; + co_return true; } - return false; + co_return false; } ASSERT(lastKey != Key()); tr->clear(KeyRangeRef(range.begin, lastKey)); - return true; + co_return true; } - ACTOR static Future debugPrintRange(Reference tr, Subspace subspace, Key msg) { + static Future debugPrintRange(Reference tr, Subspace subspace, Key msg) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); - RangeResult values = wait(tr->getRange(subspace.range(), CLIENT_KNOBS->TOO_MANY)); + RangeResult values = co_await tr->getRange(subspace.range(), CLIENT_KNOBS->TOO_MANY); TraceEvent("TaskBucketDebugPrintRange") .detail("Key", subspace.key()) .detail("Count", values.size()) @@ -808,23 +814,23 @@ class TaskBucketImpl { .detail("Value", s.value); }*/ - return Void(); + co_return; } - ACTOR static Future extendTimeout(Reference tr, - Reference taskBucket, - Reference task, - UpdateParams updateParams, - Version newTimeoutVersion) { + static Future extendTimeout(Reference tr, + Reference taskBucket, + Reference task, + UpdateParams updateParams, + Version newTimeoutVersion) { taskBucket->setOptions(tr); // First make sure it's safe to keep running - wait(taskBucket->keepRunning(tr, task)); + co_await taskBucket->keepRunning(tr, task); // This is where the task definition currently exists - state Subspace oldTimeoutSpace = taskBucket->timeouts.get(task->timeoutVersion).get(task->key); + Subspace oldTimeoutSpace = taskBucket->timeouts.get(task->timeoutVersion).get(task->key); // Update the task's timeout - Version version = wait(tr->getReadVersion()); + Version version = co_await tr->getReadVersion(); if (newTimeoutVersion == invalidVersion) newTimeoutVersion = version + taskBucket->timeout; @@ -838,7 +844,7 @@ class TaskBucketImpl { } // This is where the task definition is being moved to - state Subspace newTimeoutSpace = taskBucket->timeouts.get(newTimeoutVersion).get(task->key); + Subspace newTimeoutSpace = taskBucket->timeouts.get(newTimeoutVersion).get(task->key); tr->addReadConflictRange(oldTimeoutSpace.range()); tr->addWriteConflictRange(newTimeoutSpace.range()); @@ -852,7 +858,7 @@ class TaskBucketImpl { } else { CODE_PROBE(true, "Extended a task without updating parameters"); // Otherwise, read and transplant the params from the old to new timeout spaces - RangeResult params = wait(tr->getRange(oldTimeoutSpace.range(), CLIENT_KNOBS->TOO_MANY)); + RangeResult params = co_await tr->getRange(oldTimeoutSpace.range(), CLIENT_KNOBS->TOO_MANY); for (auto& kv : params) { Tuple paramKey = oldTimeoutSpace.unpack(kv.key); tr->set(newTimeoutSpace.pack(paramKey), kv.value); @@ -861,7 +867,7 @@ class TaskBucketImpl { tr->clear(oldTimeoutSpace.range()); - return newTimeoutVersion; + co_return newTimeoutVersion; } }; @@ -928,13 +934,13 @@ void TaskBucket::setValidationCondition(Reference task, KeyRef vKey, KeyRe task->params[Task::reservedTaskParamValidValue] = vValue; } -ACTOR static Future actorAddTask(TaskBucket* tb, - Reference tr, - Reference task, - KeyRef validationKey) { +static Future actorAddTask(TaskBucket* tb, + Reference tr, + Reference task, + KeyRef validationKey) { tb->setOptions(tr); - Optional validationValue = wait(tr->get(validationKey)); + Optional validationValue = co_await tr->get(validationKey); if (!validationValue.present()) { TraceEvent(SevError, "TaskBucketAddTaskInvalidKey") @@ -945,7 +951,7 @@ ACTOR static Future actorAddTask(TaskBucket* tb, TaskBucket::setValidationCondition(task, validationKey, validationValue.get()); - return tb->addTask(tr, task); + co_return tb->addTask(tr, task); } Future TaskBucket::addTask(Reference tr, Reference task, KeyRef validationKey) { @@ -1032,11 +1038,11 @@ Future TaskBucket::debugPrintRange(Reference tr class FutureBucketImpl { public: - ACTOR static Future isEmpty(Reference tr, Reference futureBucket) { + static Future isEmpty(Reference tr, Reference futureBucket) { futureBucket->setOptions(tr); - Key lastKey = wait(tr->getKey(lastLessOrEqual(futureBucket->prefix.pack(maxUIDKey)))); - return !futureBucket->prefix.contains(lastKey); + Key lastKey = co_await tr->getKey(lastLessOrEqual(futureBucket->prefix.pack(maxUIDKey))); + co_return !futureBucket->prefix.contains(lastKey); } }; @@ -1071,29 +1077,29 @@ Reference FutureBucket::unpack(Key key) { class TaskFutureImpl { public: - ACTOR static Future join(Reference tr, - Reference taskBucket, - Reference taskFuture, - std::vector> vectorFuture) { + static Future join(Reference tr, + Reference taskBucket, + Reference taskFuture, + std::vector> vectorFuture) { taskFuture->futureBucket->setOptions(tr); - bool is_set = wait(isSet(tr, taskFuture)); + bool is_set = co_await isSet(tr, taskFuture); // A taskFuture cannot be already set for it to be joined with others. if (is_set) { - return Void(); + co_return; } tr->clear(taskFuture->blocks.pack(StringRef())); - wait(_join(tr, taskBucket, taskFuture, vectorFuture)); + co_await _join(tr, taskBucket, taskFuture, vectorFuture); - return Void(); + co_return; } - ACTOR static Future _join(Reference tr, - Reference taskBucket, - Reference taskFuture, - std::vector> vectorFuture) { + static Future _join(Reference tr, + Reference taskBucket, + Reference taskFuture, + std::vector> vectorFuture) { std::vector> onSetFutures; for (int i = 0; i < vectorFuture.size(); ++i) { Key key = StringRef(deterministicRandom()->randomUniqueID().toString()); @@ -1105,32 +1111,32 @@ class TaskFutureImpl { onSetFutures.push_back(vectorFuture[i]->onSet(tr, taskBucket, task)); } - wait(waitForAll(onSetFutures)); + co_await waitForAll(onSetFutures); - return Void(); + co_return; } - ACTOR static Future isSet(Reference tr, Reference taskFuture) { + static Future isSet(Reference tr, Reference taskFuture) { taskFuture->futureBucket->setOptions(tr); - RangeResult values = wait(tr->getRange(taskFuture->blocks.range(), 1)); + RangeResult values = co_await tr->getRange(taskFuture->blocks.range(), 1); if (values.size() > 0) - return false; + co_return false; - return true; + co_return true; } - ACTOR static Future onSet(Reference tr, - Reference taskBucket, - Reference taskFuture, - Reference task) { + static Future onSet(Reference tr, + Reference taskBucket, + Reference taskFuture, + Reference task) { taskFuture->futureBucket->setOptions(tr); - bool is_set = wait(isSet(tr, taskFuture)); + bool is_set = co_await isSet(tr, taskFuture); if (is_set) { CODE_PROBE(true, "is_set == true"); - wait(performAction(tr, taskBucket, taskFuture, task)); + co_await performAction(tr, taskBucket, taskFuture, task); } else { CODE_PROBE(true, "is_set == false"); Subspace callbackSpace = @@ -1140,49 +1146,49 @@ class TaskFutureImpl { } } - return Void(); + co_return; } - ACTOR static Future set(Reference tr, - Reference taskBucket, - Reference taskFuture) { + static Future set(Reference tr, + Reference taskBucket, + Reference taskFuture) { taskFuture->futureBucket->setOptions(tr); tr->clear(taskFuture->blocks.range()); - wait(performAllActions(tr, taskBucket, taskFuture)); + co_await performAllActions(tr, taskBucket, taskFuture); - return Void(); + co_return; } - ACTOR static Future performAction(Reference tr, - Reference taskBucket, - Reference taskFuture, - Reference task) { + static Future performAction(Reference tr, + Reference taskBucket, + Reference taskFuture, + Reference task) { taskFuture->futureBucket->setOptions(tr); if (task && TaskFuncBase::isValidTask(task)) { Reference taskFunc = TaskFuncBase::create(task->params[Task::reservedTaskParamKeyType]); if (taskFunc.getPtr()) { - wait(taskFunc->finish(tr, taskBucket, taskFuture->futureBucket, task)); + co_await taskFunc->finish(tr, taskBucket, taskFuture->futureBucket, task); } } - return Void(); + co_return; } - ACTOR static Future performAllActions(Reference tr, - Reference taskBucket, - Reference taskFuture) { + static Future performAllActions(Reference tr, + Reference taskBucket, + Reference taskFuture) { taskFuture->futureBucket->setOptions(tr); - RangeResult values = wait(tr->getRange(taskFuture->callbacks.range(), CLIENT_KNOBS->TOO_MANY)); + RangeResult values = co_await tr->getRange(taskFuture->callbacks.range(), CLIENT_KNOBS->TOO_MANY); tr->clear(taskFuture->callbacks.range()); std::vector> actions; if (values.size() != 0) { - state Reference task(new Task()); + Reference task(new Task()); Key lastTaskID; for (auto& s : values) { Tuple t = taskFuture->callbacks.unpack(s.key); @@ -1201,32 +1207,32 @@ class TaskFutureImpl { actions.push_back(performAction(tr, taskBucket, taskFuture, task)); } - wait(waitForAll(actions)); + co_await waitForAll(actions); - return Void(); + co_return; } - ACTOR static Future onSetAddTask(Reference tr, - Reference taskBucket, - Reference taskFuture, - Reference task) { + static Future onSetAddTask(Reference tr, + Reference taskBucket, + Reference taskFuture, + Reference task) { taskFuture->futureBucket->setOptions(tr); task->params[Task::reservedTaskParamKeyAddTask] = task->params[Task::reservedTaskParamKeyType]; task->params[Task::reservedTaskParamKeyType] = "AddTask"_sr; - wait(onSet(tr, taskBucket, taskFuture, task)); + co_await onSet(tr, taskBucket, taskFuture, task); - return Void(); + co_return; } - ACTOR static Future onSetAddTask(Reference tr, - Reference taskBucket, - Reference taskFuture, - Reference task, - KeyRef validationKey) { + static Future onSetAddTask(Reference tr, + Reference taskBucket, + Reference taskFuture, + Reference task, + KeyRef validationKey) { taskFuture->futureBucket->setOptions(tr); - Optional validationValue = wait(tr->get(validationKey)); + Optional validationValue = co_await tr->get(validationKey); if (!validationValue.present()) { TraceEvent(SevError, "TaskBucketOnSetAddTaskInvalidKey") @@ -1238,9 +1244,9 @@ class TaskFutureImpl { task->params[Task::reservedTaskParamValidKey] = validationKey; task->params[Task::reservedTaskParamValidValue] = validationValue.get(); - wait(onSetAddTask(tr, taskBucket, taskFuture, task)); + co_await onSetAddTask(tr, taskBucket, taskFuture, task); - return Void(); + co_return; } static Future onSetAddTask(Reference tr, @@ -1257,18 +1263,18 @@ class TaskFutureImpl { return onSetAddTask(tr, taskBucket, taskFuture, task); } - ACTOR static Future> joinedFuture(Reference tr, - Reference taskBucket, - Reference taskFuture) { + static Future> joinedFuture(Reference tr, + Reference taskBucket, + Reference taskFuture) { taskFuture->futureBucket->setOptions(tr); std::vector> vectorFuture; // the next line means generate a new task future with different key, // but share the same prefix of futureBucket with the input taskFuture - state Reference future = taskFuture->futureBucket->future(tr); + Reference future = taskFuture->futureBucket->future(tr); vectorFuture.push_back(future); - wait(join(tr, taskBucket, taskFuture, vectorFuture)); - return future; + co_await join(tr, taskBucket, taskFuture, vectorFuture); + co_return future; } }; @@ -1341,11 +1347,11 @@ Future> TaskFuture::joinedFuture(Reference::addRef(this)); } -ACTOR Future getCompletionKey(TaskCompletionKey* self, Future> f) { - Reference taskFuture = wait(f); +Future getCompletionKey(TaskCompletionKey* self, Future> f) { + Reference taskFuture = co_await f; self->joinFuture.clear(); self->key = taskFuture->key; - return self->key.get(); + co_return self->key.get(); } Future TaskCompletionKey::get(Reference tr, Reference taskBucket) { From 964c532fa056ef08059d20fd9855dd166e17c480 Mon Sep 17 00:00:00 2001 From: Trevor Clinkenbeard Date: Tue, 17 Mar 2026 21:04:34 +0000 Subject: [PATCH 2/7] Remove shadowing in TaskBucketImpl::doTask --- fdbclient/TaskBucket.actor.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/fdbclient/TaskBucket.actor.cpp b/fdbclient/TaskBucket.actor.cpp index b65f081ed05..0ef7998cd61 100644 --- a/fdbclient/TaskBucket.actor.cpp +++ b/fdbclient/TaskBucket.actor.cpp @@ -415,7 +415,7 @@ class TaskBucketImpl { if (!task || !TaskFuncBase::isValidTask(task)) co_return false; - Optional err; + Error err; try { taskFunc = TaskFuncBase::create(task->params[Task::reservedTaskParamKeyType]); if (taskFunc) { @@ -425,7 +425,7 @@ class TaskBucketImpl { loop { Reference tr(new ReadYourWritesTransaction(cx)); taskBucket->setOptions(tr); - Error err; + Error innerErr; try { bool validTask = co_await taskVerify(taskBucket, tr, task); @@ -439,9 +439,9 @@ class TaskBucketImpl { } break; } catch (Error& e) { - err = e; + innerErr = e; } - co_await tr->onError(err); + co_await tr->onError(innerErr); } } @@ -459,12 +459,12 @@ class TaskBucketImpl { err = e; } TraceEvent(SevWarn, "TaskBucketExecuteFailure") - .error(err.get()) + .error(err) .detail("TaskUID", task->key) .detail("TaskType", task->params[Task::reservedTaskParamKeyType].printable()) .detail("Priority", task->getPriority()); try { - co_await taskFunc->handleError(cx, task, err.get()); + co_await taskFunc->handleError(cx, task, err); } catch (Error& handleErr) { TraceEvent(SevWarn, "TaskBucketExecuteFailureLogErrorFailed") .error(handleErr) // output handleError() error instead of original task error From e677d87b1ac8772c6fd6ba804062c0691645a96b Mon Sep 17 00:00:00 2001 From: Trevor Clinkenbeard Date: Tue, 17 Mar 2026 21:06:41 +0000 Subject: [PATCH 3/7] Remove trailing co_return statements --- fdbclient/TaskBucket.actor.cpp | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/fdbclient/TaskBucket.actor.cpp b/fdbclient/TaskBucket.actor.cpp index 0ef7998cd61..ed31faf2f6c 100644 --- a/fdbclient/TaskBucket.actor.cpp +++ b/fdbclient/TaskBucket.actor.cpp @@ -58,8 +58,6 @@ struct UnblockFutureTaskFunc : TaskFuncBase { if (is_set) { co_await future->performAllActions(tr, taskBucket); } - - co_return; } }; StringRef UnblockFutureTaskFunc::name = "UnblockFuture"_sr; @@ -348,8 +346,6 @@ class TaskBucketImpl { } else { co_await taskFunc->finish(tr, taskBucket, futureBucket, task); } - - co_return; } static Future doOne(Database cx, Reference taskBucket, Reference futureBucket) { @@ -813,8 +809,6 @@ class TaskBucketImpl { .detail("Key", s.key) .detail("Value", s.value); }*/ - - co_return; } static Future extendTimeout(Reference tr, @@ -1092,8 +1086,6 @@ class TaskFutureImpl { tr->clear(taskFuture->blocks.pack(StringRef())); co_await _join(tr, taskBucket, taskFuture, vectorFuture); - - co_return; } static Future _join(Reference tr, @@ -1112,8 +1104,6 @@ class TaskFutureImpl { } co_await waitForAll(onSetFutures); - - co_return; } static Future isSet(Reference tr, Reference taskFuture) { @@ -1145,8 +1135,6 @@ class TaskFutureImpl { tr->set(callbackSpace.pack(v.key), v.value); } } - - co_return; } static Future set(Reference tr, @@ -1157,8 +1145,6 @@ class TaskFutureImpl { tr->clear(taskFuture->blocks.range()); co_await performAllActions(tr, taskBucket, taskFuture); - - co_return; } static Future performAction(Reference tr, @@ -1173,8 +1159,6 @@ class TaskFutureImpl { co_await taskFunc->finish(tr, taskBucket, taskFuture->futureBucket, task); } } - - co_return; } static Future performAllActions(Reference tr, @@ -1208,8 +1192,6 @@ class TaskFutureImpl { } co_await waitForAll(actions); - - co_return; } static Future onSetAddTask(Reference tr, @@ -1221,8 +1203,6 @@ class TaskFutureImpl { task->params[Task::reservedTaskParamKeyAddTask] = task->params[Task::reservedTaskParamKeyType]; task->params[Task::reservedTaskParamKeyType] = "AddTask"_sr; co_await onSet(tr, taskBucket, taskFuture, task); - - co_return; } static Future onSetAddTask(Reference tr, @@ -1245,8 +1225,6 @@ class TaskFutureImpl { task->params[Task::reservedTaskParamValidValue] = validationValue.get(); co_await onSetAddTask(tr, taskBucket, taskFuture, task); - - co_return; } static Future onSetAddTask(Reference tr, From 1a37d35f40bf6e6a3e94651dc0df124cd4f30406 Mon Sep 17 00:00:00 2001 From: Trevor Clinkenbeard Date: Tue, 17 Mar 2026 21:14:42 +0000 Subject: [PATCH 4/7] Rename fdbclient/TaskBucket.actor.cpp --- .../{TaskBucket.actor.cpp => TaskBucket.cpp} | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) rename fdbclient/{TaskBucket.actor.cpp => TaskBucket.cpp} (99%) diff --git a/fdbclient/TaskBucket.actor.cpp b/fdbclient/TaskBucket.cpp similarity index 99% rename from fdbclient/TaskBucket.actor.cpp rename to fdbclient/TaskBucket.cpp index ed31faf2f6c..93410745c85 100644 --- a/fdbclient/TaskBucket.actor.cpp +++ b/fdbclient/TaskBucket.cpp @@ -1,5 +1,5 @@ /* - * TaskBucket.actor.cpp + * TaskBucket.cpp * * This source file is part of the FoundationDB open source project * @@ -21,7 +21,6 @@ #include "fdbclient/TaskBucket.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/ReadYourWrites.h" -#include "flow/actorcompiler.h" // has to be last include Reference Task::getDoneFuture(Reference fb) { return fb->unpack(params[reservedTaskParamKeyDone]); @@ -312,7 +311,7 @@ class TaskBucketImpl { } static Future taskVerify(Reference tb, Database cx, Reference task) { - loop { + while (true) { Reference tr(new ReadYourWritesTransaction(cx)); Error err; try { @@ -362,7 +361,7 @@ class TaskBucketImpl { return map(tr->getReadVersion(), [=](Version v) { return v; }); }); - loop { + while (true) { FlowLock::Releaser releaser; // Wait until we are half way to the timeout version of this task @@ -381,7 +380,7 @@ class TaskBucketImpl { co_await task->extendMutex.take(); releaser = FlowLock::Releaser(task->extendMutex, 1); - loop { + while (true) { Error err; try { tr->reset(); @@ -418,7 +417,7 @@ class TaskBucketImpl { verifyTask.set(task->params.find(Task::reservedTaskParamValidKey) != task->params.end()); if (verifyTask) { - loop { + while (true) { Reference tr(new ReadYourWritesTransaction(cx)); taskBucket->setOptions(tr); Error innerErr; @@ -492,7 +491,7 @@ class TaskBucketImpl { std::vector>> getTasks; unsigned int getBatchSize = 1; - loop { + while (true) { // Start running tasks while slots are available and we keep finding work to do ++taskBucket->dispatchSlotChecksStarted; while (!availableSlots.empty()) { @@ -553,7 +552,7 @@ class TaskBucketImpl { } static Future watchPaused(Database cx, Reference taskBucket, Reference> paused) { - loop { + while (true) { Reference tr(new ReadYourWritesTransaction(cx)); Error err; try { @@ -579,7 +578,7 @@ class TaskBucketImpl { Future watchPausedFuture = watchPaused(cx, taskBucket, paused); taskBucket->metricLogger = taskBucket->cc.traceCounters( "TaskBucketMetrics", taskBucket->dbgid, CLIENT_KNOBS->TASKBUCKET_LOGGING_DELAY); - loop { + while (true) { while (paused->get()) { co_await (paused->onChange() || watchPausedFuture); } @@ -673,7 +672,7 @@ class TaskBucketImpl { Reference tr(new ReadYourWritesTransaction(cx)); Optional startingValue; - loop { + while (true) { Error err; try { taskBucket->setOptions(tr); @@ -697,7 +696,7 @@ class TaskBucketImpl { int idx = 0; for (; idx < CLIENT_KNOBS->TASKBUCKET_CHECK_ACTIVE_AMOUNT; ++idx) { tr = makeReference(cx); - loop { + while (true) { Error err; try { taskBucket->setOptions(tr); From 19c54c9819895462ddf143eca9f0f7c9befadcf6 Mon Sep 17 00:00:00 2001 From: Trevor Clinkenbeard Date: Wed, 18 Mar 2026 06:25:46 +0000 Subject: [PATCH 5/7] Fix bug in watchPaused --- fdbclient/TaskBucket.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fdbclient/TaskBucket.cpp b/fdbclient/TaskBucket.cpp index 93410745c85..7f845b7d6a3 100644 --- a/fdbclient/TaskBucket.cpp +++ b/fdbclient/TaskBucket.cpp @@ -553,7 +553,7 @@ class TaskBucketImpl { static Future watchPaused(Database cx, Reference taskBucket, Reference> paused) { while (true) { - Reference tr(new ReadYourWritesTransaction(cx)); + auto tr = makeReference(cx); Error err; try { taskBucket->setOptions(tr); @@ -562,6 +562,7 @@ class TaskBucketImpl { Future watchPausedFuture = tr->watch(taskBucket->pauseKey); co_await tr->commit(); co_await watchPausedFuture; + continue; } catch (Error& e) { err = e; } From 810303075ece94bbe7d8b1256d021384fde25a2a Mon Sep 17 00:00:00 2001 From: Trevor Clinkenbeard Date: Fri, 20 Mar 2026 04:08:50 +0000 Subject: [PATCH 6/7] Simplify loop variable initialization --- fdbclient/TaskBucket.cpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/fdbclient/TaskBucket.cpp b/fdbclient/TaskBucket.cpp index 7f845b7d6a3..8bcfd9a5c88 100644 --- a/fdbclient/TaskBucket.cpp +++ b/fdbclient/TaskBucket.cpp @@ -200,8 +200,7 @@ class TaskBucketImpl { std::vector>> taskKeyFutures(CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY + 1); // Start looking for a task at each priority, highest first - int pri{ 0 }; - for (pri = CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY; pri >= 0; --pri) + for (int pri = CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY; pri >= 0; --pri) taskKeyFutures[pri] = getTaskKey(tr, taskBucket, pri); // Task key and subspace it is located in. @@ -610,8 +609,7 @@ class TaskBucketImpl { resultFutures.push_back(tr->getRange(taskBucket->getAvailableSpace(pri).range(), 1)); // If any priority levels have any keys then the taskbucket is not empty so return false - int i{ 0 }; - for (i = 0; i < resultFutures.size(); ++i) { + for (int i = 0; i < resultFutures.size(); ++i) { RangeResult results = co_await resultFutures[i]; if (results.size() > 0) co_return false; @@ -633,8 +631,7 @@ class TaskBucketImpl { resultFutures.push_back(tr->getRange(taskBucket->getAvailableSpace(pri).range(), 1)); // If any priority levels have any keys then return true as the level is 'busy' - int i{ 0 }; - for (i = 0; i < resultFutures.size(); ++i) { + for (int i = 0; i < resultFutures.size(); ++i) { RangeResult results = co_await resultFutures[i]; if (results.size() > 0) co_return true; From 424dc676c155429b7cc4dca269b2f602dd26dac7 Mon Sep 17 00:00:00 2001 From: Trevor Clinkenbeard Date: Fri, 20 Mar 2026 05:49:29 +0000 Subject: [PATCH 7/7] Fix for loop variable initialization --- fdbclient/TaskBucket.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbclient/TaskBucket.cpp b/fdbclient/TaskBucket.cpp index 8bcfd9a5c88..90c3d64eb20 100644 --- a/fdbclient/TaskBucket.cpp +++ b/fdbclient/TaskBucket.cpp @@ -209,7 +209,7 @@ class TaskBucketImpl { // In priority order from highest to lowest, wait for fetch to finish and if it found a task then cancel the // rest. - for (pri = CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY; pri >= 0; --pri) { + for (int pri = CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY; pri >= 0; --pri) { // If we already have a task key then cancel this fetch if (taskKey.present()) taskKeyFutures[pri].cancel();