diff --git a/fdbclient/TaskBucket.actor.cpp b/fdbclient/TaskBucket.cpp similarity index 69% rename from fdbclient/TaskBucket.actor.cpp rename to fdbclient/TaskBucket.cpp index d3d619c84b8..90c3d64eb20 100644 --- a/fdbclient/TaskBucket.actor.cpp +++ b/fdbclient/TaskBucket.cpp @@ -1,5 +1,5 @@ /* - * TaskBucket.actor.cpp + * TaskBucket.cpp * * This source file is part of the FoundationDB open source project * @@ -21,7 +21,6 @@ #include "fdbclient/TaskBucket.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/ReadYourWrites.h" -#include "flow/actorcompiler.h" // has to be last include Reference Task::getDoneFuture(Reference fb) { return fb->unpack(params[reservedTaskParamKeyDone]); @@ -44,22 +43,20 @@ struct UnblockFutureTaskFunc : TaskFuncBase { return _finish(tr, tb, fb, task); }; - ACTOR static Future _finish(Reference tr, - Reference taskBucket, - Reference futureBucket, - Reference task) { - state Reference future = futureBucket->unpack(task->params[Task::reservedTaskParamKeyFuture]); + static Future _finish(Reference tr, + Reference taskBucket, + Reference futureBucket, + Reference task) { + Reference future = futureBucket->unpack(task->params[Task::reservedTaskParamKeyFuture]); futureBucket->setOptions(tr); tr->clear(future->blocks.pack(task->params[Task::reservedTaskParamKeyBlockID])); - bool is_set = wait(future->isSet(tr)); + bool is_set = co_await future->isSet(tr); if (is_set) { - wait(future->performAllActions(tr, taskBucket)); + co_await future->performAllActions(tr, taskBucket); } - - return Void(); } }; StringRef UnblockFutureTaskFunc::name = "UnblockFuture"_sr; @@ -159,36 +156,35 @@ unsigned int Task::getPriority() const { class TaskBucketImpl { public: - ACTOR static Future> getTaskKey(Reference tr, - Reference taskBucket, - int priority = 0) { + static Future> getTaskKey(Reference tr, + Reference taskBucket, + int priority = 0) { Standalone uid = StringRef(deterministicRandom()->randomUniqueID().toString()); // Get keyspace for the specified priority level - state Subspace space = taskBucket->getAvailableSpace(priority); + Subspace space = taskBucket->getAvailableSpace(priority); { // Get a task key that is <= a random UID task key, if successful then return it RangeResult value = - wait(tr->getRange(KeyRangeRef(space.key(), space.pack(uid)), 1, Snapshot::True, Reverse::True)); + co_await tr->getRange(KeyRangeRef(space.key(), space.pack(uid)), 1, Snapshot::True, Reverse::True); if (!value.empty()) { - return Optional(value[0].key); + co_return Optional(value[0].key); } } { // Get a task key that is <= the maximum possible UID, if successful return it. - RangeResult value = - wait(tr->getRange(KeyRangeRef(space.key(), space.pack(maxUIDKey)), 1, Snapshot::True, Reverse::True)); + RangeResult value = co_await tr->getRange( + KeyRangeRef(space.key(), space.pack(maxUIDKey)), 1, Snapshot::True, Reverse::True); if (!value.empty()) { - return Optional(value[0].key); + co_return Optional(value[0].key); } } - return Optional(); + co_return Optional(); } - ACTOR static Future> getOne(Reference tr, - Reference taskBucket) { + static Future> getOne(Reference tr, Reference taskBucket) { if (taskBucket->priority_batch) tr->setOption(FDBTransactionOptions::PRIORITY_BATCH); @@ -197,29 +193,28 @@ class TaskBucketImpl { // give it some chances for the timed out tasks to get into the task loop in the case of // many other new tasks get added so that the timed out tasks never get chances to re-run if (deterministicRandom()->random01() < CLIENT_KNOBS->TASKBUCKET_CHECK_TIMEOUT_CHANCE) { - bool anyTimeouts = wait(requeueTimedOutTasks(tr, taskBucket)); + bool anyTimeouts = co_await requeueTimedOutTasks(tr, taskBucket); CODE_PROBE(anyTimeouts, "Found a task that timed out"); } - state std::vector>> taskKeyFutures(CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY + 1); + std::vector>> taskKeyFutures(CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY + 1); // Start looking for a task at each priority, highest first - state int pri; - for (pri = CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY; pri >= 0; --pri) + for (int pri = CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY; pri >= 0; --pri) taskKeyFutures[pri] = getTaskKey(tr, taskBucket, pri); // Task key and subspace it is located in. - state Optional taskKey; - state Subspace availableSpace; + Optional taskKey; + Subspace availableSpace; // In priority order from highest to lowest, wait for fetch to finish and if it found a task then cancel the // rest. - for (pri = CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY; pri >= 0; --pri) { + for (int pri = CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY; pri >= 0; --pri) { // If we already have a task key then cancel this fetch if (taskKey.present()) taskKeyFutures[pri].cancel(); else { - Optional key = wait(taskKeyFutures[pri]); + Optional key = co_await taskKeyFutures[pri]; if (key.present()) { taskKey = key; availableSpace = taskBucket->getAvailableSpace(pri); @@ -229,26 +224,26 @@ class TaskBucketImpl { // If we don't have a task key, requeue timed out tasks and try again by calling self. if (!taskKey.present()) { - bool anyTimeouts = wait(requeueTimedOutTasks(tr, taskBucket)); + bool anyTimeouts = co_await requeueTimedOutTasks(tr, taskBucket); // If there were timeouts, try to get a task since there should now be one in one of the available spaces. if (anyTimeouts) { CODE_PROBE(true, "Try to get one task from timeouts subspace"); - Reference task = wait(getOne(tr, taskBucket)); - return task; + Reference task = co_await getOne(tr, taskBucket); + co_return task; } - return Reference(); + co_return Reference(); } // Now we know the task key is present and we have the available space for the task's priority - state Tuple t = availableSpace.unpack(taskKey.get()); - state Key taskUID = t.getString(0); - state Subspace taskAvailableSpace = availableSpace.get(taskUID); + Tuple t = availableSpace.unpack(taskKey.get()); + Key taskUID = t.getString(0); + Subspace taskAvailableSpace = availableSpace.get(taskUID); - state Reference task(new Task()); + Reference task(new Task()); task->key = taskUID; - state RangeResult values = wait(tr->getRange(taskAvailableSpace.range(), CLIENT_KNOBS->TOO_MANY)); - Version version = wait(tr->getReadVersion()); + RangeResult values = co_await tr->getRange(taskAvailableSpace.range(), CLIENT_KNOBS->TOO_MANY); + Version version = co_await tr->getReadVersion(); task->timeoutVersion = version + (uint64_t)(taskBucket->timeout * (CLIENT_KNOBS->TASKBUCKET_TIMEOUT_JITTER_OFFSET + @@ -266,19 +261,19 @@ class TaskBucketImpl { tr->clear(taskAvailableSpace.range()); tr->set(taskBucket->active.key(), deterministicRandom()->randomUniqueID().toString()); - return task; + co_return task; } // Verify that the user configured task verification key still has the user specified value - ACTOR static Future taskVerify(Reference tb, - Reference tr, - Reference task) { + static Future taskVerify(Reference tb, + Reference tr, + Reference task) { if (task->params.find(Task::reservedTaskParamValidKey) == task->params.end()) { TraceEvent("TaskBucketTaskVerifyInvalidTask") .detail("Task", task->params[Task::reservedTaskParamKeyType]) .detail("ReservedTaskParamValidKey", "missing"); - return false; + co_return false; } if (task->params.find(Task::reservedTaskParamValidValue) == task->params.end()) { @@ -286,12 +281,12 @@ class TaskBucketImpl { .detail("Task", task->params[Task::reservedTaskParamKeyType]) .detail("ReservedTaskParamValidKey", task->params[Task::reservedTaskParamValidKey]) .detail("ReservedTaskParamValidValue", "missing"); - return false; + co_return false; } tb->setOptions(tr); - Optional keyValue = wait(tr->get(task->params[Task::reservedTaskParamValidKey])); + Optional keyValue = co_await tr->get(task->params[Task::reservedTaskParamValidKey]); if (!keyValue.present()) { TraceEvent("TaskBucketTaskVerifyInvalidTask") @@ -299,7 +294,7 @@ class TaskBucketImpl { .detail("ReservedTaskParamValidKey", task->params[Task::reservedTaskParamValidKey]) .detail("ReservedTaskParamValidValue", task->params[Task::reservedTaskParamValidValue]) .detail("KeyValue", "missing"); - return false; + co_return false; } if (keyValue.get().compare(StringRef(task->params[Task::reservedTaskParamValidValue]))) { @@ -308,74 +303,70 @@ class TaskBucketImpl { .detail("ReservedTaskParamValidKey", task->params[Task::reservedTaskParamValidKey]) .detail("ReservedTaskParamValidValue", task->params[Task::reservedTaskParamValidValue]) .detail("KeyValue", keyValue.get()); - return false; + co_return false; } - return true; + co_return true; } - ACTOR static Future taskVerify(Reference tb, Database cx, Reference task) { - loop { - state Reference tr(new ReadYourWritesTransaction(cx)); - + static Future taskVerify(Reference tb, Database cx, Reference task) { + while (true) { + Reference tr(new ReadYourWritesTransaction(cx)); + Error err; try { - bool verified = wait(taskVerify(tb, tr, task)); - return verified; + bool verified = co_await taskVerify(tb, tr, task); + co_return verified; } catch (Error& e) { - wait(tr->onError(e)); + err = e; } + co_await tr->onError(err); } } - ACTOR static Future finishTaskRun(Reference tr, - Reference taskBucket, - Reference futureBucket, - Reference task, - Reference taskFunc, - VerifyTask verifyTask) { - bool isFinished = wait(taskBucket->isFinished(tr, task)); + static Future finishTaskRun(Reference tr, + Reference taskBucket, + Reference futureBucket, + Reference task, + Reference taskFunc, + VerifyTask verifyTask) { + bool isFinished = co_await taskBucket->isFinished(tr, task); if (isFinished) { - return Void(); + co_return; } - state bool validTask = true; + bool validTask = true; if (verifyTask) { - bool _validTask = wait(taskVerify(taskBucket, tr, task)); + bool _validTask = co_await taskVerify(taskBucket, tr, task); validTask = _validTask; } if (!validTask) { - wait(taskBucket->finish(tr, task)); + co_await taskBucket->finish(tr, task); } else { - wait(taskFunc->finish(tr, taskBucket, futureBucket, task)); + co_await taskFunc->finish(tr, taskBucket, futureBucket, task); } - - return Void(); } - ACTOR static Future doOne(Database cx, - Reference taskBucket, - Reference futureBucket) { - state Reference task = wait(taskBucket->getOne(cx)); - bool result = wait(taskBucket->doTask(cx, futureBucket, task)); - return result; + static Future doOne(Database cx, Reference taskBucket, Reference futureBucket) { + Reference task = co_await taskBucket->getOne(cx); + bool result = co_await taskBucket->doTask(cx, futureBucket, task); + co_return result; } - ACTOR static Future extendTimeoutRepeatedly(Database cx, - Reference taskBucket, - Reference task) { - state Reference tr(new ReadYourWritesTransaction(cx)); - state double start = now(); - state Version versionNow = wait(runRYWTransaction(cx, [=](Reference tr) { + static Future extendTimeoutRepeatedly(Database cx, Reference taskBucket, Reference task) { + Reference tr(new ReadYourWritesTransaction(cx)); + double start = now(); + Version versionNow = co_await runRYWTransaction(cx, [=](Reference tr) { taskBucket->setOptions(tr); return map(tr->getReadVersion(), [=](Version v) { return v; }); - })); + }); - loop { - state FlowLock::Releaser releaser; + while (true) { + FlowLock::Releaser releaser; // Wait until we are half way to the timeout version of this task - wait(delay(0.8 * (BUGGIFY ? (2 * deterministicRandom()->random01()) : 1.0) * - (double)(task->timeoutVersion - (uint64_t)versionNow) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND)); + co_await delay(0.8 * (BUGGIFY ? (2 * deterministicRandom()->random01()) : 1.0) * + (double)(task->timeoutVersion - (uint64_t)versionNow) / + CLIENT_KNOBS->CORE_VERSIONSPERSECOND); if (now() - start > 300) { TraceEvent(SevWarnAlways, "TaskBucketLongExtend") @@ -385,122 +376,128 @@ class TaskBucketImpl { .detail("Priority", task->getPriority()); } // Take the extendMutex lock until we either succeed or stop trying to extend due to failure - wait(task->extendMutex.take()); + co_await task->extendMutex.take(); releaser = FlowLock::Releaser(task->extendMutex, 1); - loop { + while (true) { + Error err; try { tr->reset(); taskBucket->setOptions(tr); // Attempt to extend the task's timeout - state Version newTimeout = wait(taskBucket->extendTimeout(tr, task, UpdateParams::False)); - wait(tr->commit()); + Version newTimeout = co_await taskBucket->extendTimeout(tr, task, UpdateParams::False); + co_await tr->commit(); task->timeoutVersion = newTimeout; versionNow = tr->getCommittedVersion(); break; } catch (Error& e) { - wait(tr->onError(e)); + err = e; } + co_await tr->onError(err); } } } - ACTOR static Future doTask(Database cx, - Reference taskBucket, - Reference futureBucket, - Reference task) { - state Reference taskFunc; - state VerifyTask verifyTask(false); + static Future doTask(Database cx, + Reference taskBucket, + Reference futureBucket, + Reference task) { + Reference taskFunc; + VerifyTask verifyTask(false); if (!task || !TaskFuncBase::isValidTask(task)) - return false; + co_return false; + Error err; try { taskFunc = TaskFuncBase::create(task->params[Task::reservedTaskParamKeyType]); if (taskFunc) { verifyTask.set(task->params.find(Task::reservedTaskParamValidKey) != task->params.end()); if (verifyTask) { - loop { - state Reference tr(new ReadYourWritesTransaction(cx)); + while (true) { + Reference tr(new ReadYourWritesTransaction(cx)); taskBucket->setOptions(tr); - + Error innerErr; try { - bool validTask = wait(taskVerify(taskBucket, tr, task)); + bool validTask = co_await taskVerify(taskBucket, tr, task); if (!validTask) { - bool isFinished = wait(taskBucket->isFinished(tr, task)); + bool isFinished = co_await taskBucket->isFinished(tr, task); if (!isFinished) { - wait(taskBucket->finish(tr, task)); + co_await taskBucket->finish(tr, task); } - wait(tr->commit()); - return true; + co_await tr->commit(); + co_return true; } break; } catch (Error& e) { - wait(tr->onError(e)); + innerErr = e; } + co_await tr->onError(innerErr); } } - wait(taskFunc->execute(cx, taskBucket, futureBucket, task) || - extendTimeoutRepeatedly(cx, taskBucket, task)); + co_await (taskFunc->execute(cx, taskBucket, futureBucket, task) || + extendTimeoutRepeatedly(cx, taskBucket, task)); if (BUGGIFY) - wait(delay(10.0)); - wait(runRYWTransaction(cx, [=](Reference tr) { + co_await delay(10.0); + co_await runRYWTransaction(cx, [=](Reference tr) { return finishTaskRun(tr, taskBucket, futureBucket, task, taskFunc, verifyTask); - })); + }); } + co_return true; } catch (Error& e) { - TraceEvent(SevWarn, "TaskBucketExecuteFailure") - .error(e) - .detail("TaskUID", task->key) + err = e; + } + TraceEvent(SevWarn, "TaskBucketExecuteFailure") + .error(err) + .detail("TaskUID", task->key) + .detail("TaskType", task->params[Task::reservedTaskParamKeyType].printable()) + .detail("Priority", task->getPriority()); + try { + co_await taskFunc->handleError(cx, task, err); + } catch (Error& handleErr) { + TraceEvent(SevWarn, "TaskBucketExecuteFailureLogErrorFailed") + .error(handleErr) // output handleError() error instead of original task error + .detail("TaskUID", task->key.printable()) .detail("TaskType", task->params[Task::reservedTaskParamKeyType].printable()) .detail("Priority", task->getPriority()); - try { - wait(taskFunc->handleError(cx, task, e)); - } catch (Error& e) { - TraceEvent(SevWarn, "TaskBucketExecuteFailureLogErrorFailed") - .error(e) // output handleError() error instead of original task error - .detail("TaskUID", task->key.printable()) - .detail("TaskType", task->params[Task::reservedTaskParamKeyType].printable()) - .detail("Priority", task->getPriority()); - } } // Return true to indicate that we did work. - return true; + co_return true; } - ACTOR static Future dispatch(Database cx, - Reference taskBucket, - Reference futureBucket, - std::shared_ptr pollDelay, - int maxConcurrentTasks) { - state std::vector> tasks(maxConcurrentTasks); + static Future dispatch(Database cx, + Reference taskBucket, + Reference futureBucket, + std::shared_ptr pollDelay, + int maxConcurrentTasks) { + std::vector> tasks(maxConcurrentTasks); for (auto& f : tasks) f = Never(); // Since the futures have to be kept in a vector to be compatible with waitForAny(), we'll keep a queue // of available slots in it. Initially, they're all available. - state std::vector availableSlots; + std::vector availableSlots; availableSlots.reserve(tasks.size()); for (int i = 0; i < tasks.size(); ++i) availableSlots.push_back(i); - state std::vector>> getTasks; - state unsigned int getBatchSize = 1; + std::vector>> getTasks; + unsigned int getBatchSize = 1; - loop { + while (true) { // Start running tasks while slots are available and we keep finding work to do ++taskBucket->dispatchSlotChecksStarted; while (!availableSlots.empty()) { getTasks.clear(); for (int i = 0, imax = std::min(getBatchSize, availableSlots.size()); i < imax; ++i) getTasks.push_back(taskBucket->getOne(cx)); - wait(waitForAllReady(getTasks)); + co_await waitForAllReady(getTasks); bool done = false; for (int i = 0; i < getTasks.size(); ++i) { @@ -540,7 +537,7 @@ class TaskBucketImpl { w = w || delay(*pollDelay * (0.9 + deterministicRandom()->random01() / 5)); // Jittered by 20 %, so +/- 10% } - wait(w); + co_await w; // Check all of the task slots, any that are finished should be replaced with Never() and their slots added // back to availableSlots @@ -553,40 +550,41 @@ class TaskBucketImpl { } } - ACTOR static Future watchPaused(Database cx, - Reference taskBucket, - Reference> paused) { - loop { - state Reference tr(new ReadYourWritesTransaction(cx)); + static Future watchPaused(Database cx, Reference taskBucket, Reference> paused) { + while (true) { + auto tr = makeReference(cx); + Error err; try { taskBucket->setOptions(tr); - Optional pausedVal = wait(tr->get(taskBucket->pauseKey)); + Optional pausedVal = co_await tr->get(taskBucket->pauseKey); paused->set(pausedVal.present()); - state Future watchPausedFuture = tr->watch(taskBucket->pauseKey); - wait(tr->commit()); - wait(watchPausedFuture); + Future watchPausedFuture = tr->watch(taskBucket->pauseKey); + co_await tr->commit(); + co_await watchPausedFuture; + continue; } catch (Error& e) { - wait(tr->onError(e)); + err = e; } + co_await tr->onError(err); } } - ACTOR static Future run(Database cx, - Reference taskBucket, - Reference futureBucket, - std::shared_ptr pollDelay, - int maxConcurrentTasks) { - state Reference> paused = makeReference>(true); - state Future watchPausedFuture = watchPaused(cx, taskBucket, paused); + static Future run(Database cx, + Reference taskBucket, + Reference futureBucket, + std::shared_ptr pollDelay, + int maxConcurrentTasks) { + Reference> paused = makeReference>(true); + Future watchPausedFuture = watchPaused(cx, taskBucket, paused); taskBucket->metricLogger = taskBucket->cc.traceCounters( "TaskBucketMetrics", taskBucket->dbgid, CLIENT_KNOBS->TASKBUCKET_LOGGING_DELAY); - loop { + while (true) { while (paused->get()) { - wait(paused->onChange() || watchPausedFuture); + co_await (paused->onChange() || watchPausedFuture); } - wait(dispatch(cx, taskBucket, futureBucket, pollDelay, maxConcurrentTasks) || paused->onChange() || - watchPausedFuture); + co_await (dispatch(cx, taskBucket, futureBucket, pollDelay, maxConcurrentTasks) || paused->onChange() || + watchPausedFuture); } } @@ -602,148 +600,149 @@ class TaskBucketImpl { return runRYWTransaction(cx, [=](Reference tr) { return addIdle(tr, taskBucket); }); } - ACTOR static Future isEmpty(Reference tr, Reference taskBucket) { + static Future isEmpty(Reference tr, Reference taskBucket) { taskBucket->setOptions(tr); // Check all available priorities for keys - state std::vector> resultFutures; + std::vector> resultFutures; for (int pri = 0; pri <= CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY; ++pri) resultFutures.push_back(tr->getRange(taskBucket->getAvailableSpace(pri).range(), 1)); // If any priority levels have any keys then the taskbucket is not empty so return false - state int i; - for (i = 0; i < resultFutures.size(); ++i) { - RangeResult results = wait(resultFutures[i]); + for (int i = 0; i < resultFutures.size(); ++i) { + RangeResult results = co_await resultFutures[i]; if (results.size() > 0) - return false; + co_return false; } - RangeResult values = wait(tr->getRange(taskBucket->timeouts.range(), 1)); + RangeResult values = co_await tr->getRange(taskBucket->timeouts.range(), 1); if (values.size() > 0) - return false; + co_return false; - return true; + co_return true; } - ACTOR static Future isBusy(Reference tr, Reference taskBucket) { + static Future isBusy(Reference tr, Reference taskBucket) { taskBucket->setOptions(tr); // Check all available priorities for emptiness - state std::vector> resultFutures; + std::vector> resultFutures; for (int pri = 0; pri <= CLIENT_KNOBS->TASKBUCKET_MAX_PRIORITY; ++pri) resultFutures.push_back(tr->getRange(taskBucket->getAvailableSpace(pri).range(), 1)); // If any priority levels have any keys then return true as the level is 'busy' - state int i; - for (i = 0; i < resultFutures.size(); ++i) { - RangeResult results = wait(resultFutures[i]); + for (int i = 0; i < resultFutures.size(); ++i) { + RangeResult results = co_await resultFutures[i]; if (results.size() > 0) - return true; + co_return true; } - return false; + co_return false; } // Verify that the task's keys are still in the timeout space at the expected timeout prefix - ACTOR static Future isFinished(Reference tr, - Reference taskBucket, - Reference task) { + static Future isFinished(Reference tr, + Reference taskBucket, + Reference task) { taskBucket->setOptions(tr); Tuple t = Tuple::makeTuple(task->timeoutVersion, task->key); - RangeResult values = wait(tr->getRange(taskBucket->timeouts.range(t), 1)); + RangeResult values = co_await tr->getRange(taskBucket->timeouts.range(t), 1); if (values.size() > 0) - return false; + co_return false; - return true; + co_return true; } - ACTOR static Future getActiveKey(Reference tr, - Reference taskBucket, - Optional startingValue) { + static Future getActiveKey(Reference tr, + Reference taskBucket, + Optional startingValue) { taskBucket->setOptions(tr); - Optional new_value = wait(tr->get(taskBucket->active.key())); + Optional new_value = co_await tr->get(taskBucket->active.key()); if (new_value != startingValue) { - return true; + co_return true; } - return false; + co_return false; } - ACTOR static Future checkActive(Database cx, Reference taskBucket) { - state Reference tr(new ReadYourWritesTransaction(cx)); - state Optional startingValue; + static Future checkActive(Database cx, Reference taskBucket) { + Reference tr(new ReadYourWritesTransaction(cx)); + Optional startingValue; - loop { + while (true) { + Error err; try { taskBucket->setOptions(tr); - bool is_busy = wait(isBusy(tr, taskBucket)); + bool is_busy = co_await isBusy(tr, taskBucket); if (!is_busy) { - wait(success(addIdle(tr, taskBucket))); + co_await success(addIdle(tr, taskBucket)); } - Optional val = wait(tr->get(taskBucket->active.key())); + Optional val = co_await tr->get(taskBucket->active.key()); startingValue = val; - wait(tr->commit()); + co_await tr->commit(); break; } catch (Error& e) { - wait(tr->onError(e)); + err = e; } + co_await tr->onError(err); } - state int idx = 0; + int idx = 0; for (; idx < CLIENT_KNOBS->TASKBUCKET_CHECK_ACTIVE_AMOUNT; ++idx) { tr = makeReference(cx); - loop { + while (true) { + Error err; try { taskBucket->setOptions(tr); - wait(delay(CLIENT_KNOBS->TASKBUCKET_CHECK_ACTIVE_DELAY)); - bool isActiveKey = wait(getActiveKey(tr, taskBucket, startingValue)); + co_await delay(CLIENT_KNOBS->TASKBUCKET_CHECK_ACTIVE_DELAY); + bool isActiveKey = co_await getActiveKey(tr, taskBucket, startingValue); if (isActiveKey) { CODE_PROBE(true, "checkActive return true"); - return true; + co_return true; } break; } catch (Error& e) { - wait(tr->onError(e)); + err = e; } + co_await tr->onError(err); } } CODE_PROBE(true, "checkActive return false"); - return false; + co_return false; } - ACTOR static Future getTaskCount(Reference tr, - Reference taskBucket) { + static Future getTaskCount(Reference tr, Reference taskBucket) { taskBucket->setOptions(tr); - Optional val = wait(tr->get(taskBucket->prefix.pack("task_count"_sr))); + Optional val = co_await tr->get(taskBucket->prefix.pack("task_count"_sr)); if (!val.present()) - return 0; + co_return 0; ASSERT(val.get().size() == sizeof(int64_t)); int64_t intValue = 0; memcpy(&intValue, val.get().begin(), val.get().size()); - return intValue; + co_return intValue; } // Looks for tasks that have timed out and returns them to be available tasks. // Returns True if any tasks were affected. - ACTOR static Future requeueTimedOutTasks(Reference tr, - Reference taskBucket) { + static Future requeueTimedOutTasks(Reference tr, + Reference taskBucket) { CODE_PROBE(true, "Looks for tasks that have timed out and returns them to be available tasks."); - Version end = wait(tr->getReadVersion()); - state KeyRange range( + Version end = co_await tr->getReadVersion(); + KeyRange range( KeyRangeRef(taskBucket->timeouts.get(0).range().begin, taskBucket->timeouts.get(end).range().end)); - RangeResult values = wait(tr->getRange(range, CLIENT_KNOBS->TASKBUCKET_MAX_TASK_KEYS)); + RangeResult values = co_await tr->getRange(range, CLIENT_KNOBS->TASKBUCKET_MAX_TASK_KEYS); // Keys will be tuples of (taskUID, param) -> paramValue // Unfortunately we need to know the priority parameter for a taskUID before we can know which available-tasks @@ -781,20 +780,20 @@ class TaskBucketImpl { if (values.size() > 0) { tr->clear(range); - return true; + co_return true; } - return false; + co_return false; } ASSERT(lastKey != Key()); tr->clear(KeyRangeRef(range.begin, lastKey)); - return true; + co_return true; } - ACTOR static Future debugPrintRange(Reference tr, Subspace subspace, Key msg) { + static Future debugPrintRange(Reference tr, Subspace subspace, Key msg) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); - RangeResult values = wait(tr->getRange(subspace.range(), CLIENT_KNOBS->TOO_MANY)); + RangeResult values = co_await tr->getRange(subspace.range(), CLIENT_KNOBS->TOO_MANY); TraceEvent("TaskBucketDebugPrintRange") .detail("Key", subspace.key()) .detail("Count", values.size()) @@ -807,24 +806,22 @@ class TaskBucketImpl { .detail("Key", s.key) .detail("Value", s.value); }*/ - - return Void(); } - ACTOR static Future extendTimeout(Reference tr, - Reference taskBucket, - Reference task, - UpdateParams updateParams, - Version newTimeoutVersion) { + static Future extendTimeout(Reference tr, + Reference taskBucket, + Reference task, + UpdateParams updateParams, + Version newTimeoutVersion) { taskBucket->setOptions(tr); // First make sure it's safe to keep running - wait(taskBucket->keepRunning(tr, task)); + co_await taskBucket->keepRunning(tr, task); // This is where the task definition currently exists - state Subspace oldTimeoutSpace = taskBucket->timeouts.get(task->timeoutVersion).get(task->key); + Subspace oldTimeoutSpace = taskBucket->timeouts.get(task->timeoutVersion).get(task->key); // Update the task's timeout - Version version = wait(tr->getReadVersion()); + Version version = co_await tr->getReadVersion(); if (newTimeoutVersion == invalidVersion) newTimeoutVersion = version + taskBucket->timeout; @@ -838,7 +835,7 @@ class TaskBucketImpl { } // This is where the task definition is being moved to - state Subspace newTimeoutSpace = taskBucket->timeouts.get(newTimeoutVersion).get(task->key); + Subspace newTimeoutSpace = taskBucket->timeouts.get(newTimeoutVersion).get(task->key); tr->addReadConflictRange(oldTimeoutSpace.range()); tr->addWriteConflictRange(newTimeoutSpace.range()); @@ -852,7 +849,7 @@ class TaskBucketImpl { } else { CODE_PROBE(true, "Extended a task without updating parameters"); // Otherwise, read and transplant the params from the old to new timeout spaces - RangeResult params = wait(tr->getRange(oldTimeoutSpace.range(), CLIENT_KNOBS->TOO_MANY)); + RangeResult params = co_await tr->getRange(oldTimeoutSpace.range(), CLIENT_KNOBS->TOO_MANY); for (auto& kv : params) { Tuple paramKey = oldTimeoutSpace.unpack(kv.key); tr->set(newTimeoutSpace.pack(paramKey), kv.value); @@ -861,7 +858,7 @@ class TaskBucketImpl { tr->clear(oldTimeoutSpace.range()); - return newTimeoutVersion; + co_return newTimeoutVersion; } }; @@ -928,13 +925,13 @@ void TaskBucket::setValidationCondition(Reference task, KeyRef vKey, KeyRe task->params[Task::reservedTaskParamValidValue] = vValue; } -ACTOR static Future actorAddTask(TaskBucket* tb, - Reference tr, - Reference task, - KeyRef validationKey) { +static Future actorAddTask(TaskBucket* tb, + Reference tr, + Reference task, + KeyRef validationKey) { tb->setOptions(tr); - Optional validationValue = wait(tr->get(validationKey)); + Optional validationValue = co_await tr->get(validationKey); if (!validationValue.present()) { TraceEvent(SevError, "TaskBucketAddTaskInvalidKey") @@ -945,7 +942,7 @@ ACTOR static Future actorAddTask(TaskBucket* tb, TaskBucket::setValidationCondition(task, validationKey, validationValue.get()); - return tb->addTask(tr, task); + co_return tb->addTask(tr, task); } Future TaskBucket::addTask(Reference tr, Reference task, KeyRef validationKey) { @@ -1032,11 +1029,11 @@ Future TaskBucket::debugPrintRange(Reference tr class FutureBucketImpl { public: - ACTOR static Future isEmpty(Reference tr, Reference futureBucket) { + static Future isEmpty(Reference tr, Reference futureBucket) { futureBucket->setOptions(tr); - Key lastKey = wait(tr->getKey(lastLessOrEqual(futureBucket->prefix.pack(maxUIDKey)))); - return !futureBucket->prefix.contains(lastKey); + Key lastKey = co_await tr->getKey(lastLessOrEqual(futureBucket->prefix.pack(maxUIDKey))); + co_return !futureBucket->prefix.contains(lastKey); } }; @@ -1071,29 +1068,27 @@ Reference FutureBucket::unpack(Key key) { class TaskFutureImpl { public: - ACTOR static Future join(Reference tr, - Reference taskBucket, - Reference taskFuture, - std::vector> vectorFuture) { + static Future join(Reference tr, + Reference taskBucket, + Reference taskFuture, + std::vector> vectorFuture) { taskFuture->futureBucket->setOptions(tr); - bool is_set = wait(isSet(tr, taskFuture)); + bool is_set = co_await isSet(tr, taskFuture); // A taskFuture cannot be already set for it to be joined with others. if (is_set) { - return Void(); + co_return; } tr->clear(taskFuture->blocks.pack(StringRef())); - wait(_join(tr, taskBucket, taskFuture, vectorFuture)); - - return Void(); + co_await _join(tr, taskBucket, taskFuture, vectorFuture); } - ACTOR static Future _join(Reference tr, - Reference taskBucket, - Reference taskFuture, - std::vector> vectorFuture) { + static Future _join(Reference tr, + Reference taskBucket, + Reference taskFuture, + std::vector> vectorFuture) { std::vector> onSetFutures; for (int i = 0; i < vectorFuture.size(); ++i) { Key key = StringRef(deterministicRandom()->randomUniqueID().toString()); @@ -1105,32 +1100,30 @@ class TaskFutureImpl { onSetFutures.push_back(vectorFuture[i]->onSet(tr, taskBucket, task)); } - wait(waitForAll(onSetFutures)); - - return Void(); + co_await waitForAll(onSetFutures); } - ACTOR static Future isSet(Reference tr, Reference taskFuture) { + static Future isSet(Reference tr, Reference taskFuture) { taskFuture->futureBucket->setOptions(tr); - RangeResult values = wait(tr->getRange(taskFuture->blocks.range(), 1)); + RangeResult values = co_await tr->getRange(taskFuture->blocks.range(), 1); if (values.size() > 0) - return false; + co_return false; - return true; + co_return true; } - ACTOR static Future onSet(Reference tr, - Reference taskBucket, - Reference taskFuture, - Reference task) { + static Future onSet(Reference tr, + Reference taskBucket, + Reference taskFuture, + Reference task) { taskFuture->futureBucket->setOptions(tr); - bool is_set = wait(isSet(tr, taskFuture)); + bool is_set = co_await isSet(tr, taskFuture); if (is_set) { CODE_PROBE(true, "is_set == true"); - wait(performAction(tr, taskBucket, taskFuture, task)); + co_await performAction(tr, taskBucket, taskFuture, task); } else { CODE_PROBE(true, "is_set == false"); Subspace callbackSpace = @@ -1139,50 +1132,44 @@ class TaskFutureImpl { tr->set(callbackSpace.pack(v.key), v.value); } } - - return Void(); } - ACTOR static Future set(Reference tr, - Reference taskBucket, - Reference taskFuture) { + static Future set(Reference tr, + Reference taskBucket, + Reference taskFuture) { taskFuture->futureBucket->setOptions(tr); tr->clear(taskFuture->blocks.range()); - wait(performAllActions(tr, taskBucket, taskFuture)); - - return Void(); + co_await performAllActions(tr, taskBucket, taskFuture); } - ACTOR static Future performAction(Reference tr, - Reference taskBucket, - Reference taskFuture, - Reference task) { + static Future performAction(Reference tr, + Reference taskBucket, + Reference taskFuture, + Reference task) { taskFuture->futureBucket->setOptions(tr); if (task && TaskFuncBase::isValidTask(task)) { Reference taskFunc = TaskFuncBase::create(task->params[Task::reservedTaskParamKeyType]); if (taskFunc.getPtr()) { - wait(taskFunc->finish(tr, taskBucket, taskFuture->futureBucket, task)); + co_await taskFunc->finish(tr, taskBucket, taskFuture->futureBucket, task); } } - - return Void(); } - ACTOR static Future performAllActions(Reference tr, - Reference taskBucket, - Reference taskFuture) { + static Future performAllActions(Reference tr, + Reference taskBucket, + Reference taskFuture) { taskFuture->futureBucket->setOptions(tr); - RangeResult values = wait(tr->getRange(taskFuture->callbacks.range(), CLIENT_KNOBS->TOO_MANY)); + RangeResult values = co_await tr->getRange(taskFuture->callbacks.range(), CLIENT_KNOBS->TOO_MANY); tr->clear(taskFuture->callbacks.range()); std::vector> actions; if (values.size() != 0) { - state Reference task(new Task()); + Reference task(new Task()); Key lastTaskID; for (auto& s : values) { Tuple t = taskFuture->callbacks.unpack(s.key); @@ -1201,32 +1188,28 @@ class TaskFutureImpl { actions.push_back(performAction(tr, taskBucket, taskFuture, task)); } - wait(waitForAll(actions)); - - return Void(); + co_await waitForAll(actions); } - ACTOR static Future onSetAddTask(Reference tr, - Reference taskBucket, - Reference taskFuture, - Reference task) { + static Future onSetAddTask(Reference tr, + Reference taskBucket, + Reference taskFuture, + Reference task) { taskFuture->futureBucket->setOptions(tr); task->params[Task::reservedTaskParamKeyAddTask] = task->params[Task::reservedTaskParamKeyType]; task->params[Task::reservedTaskParamKeyType] = "AddTask"_sr; - wait(onSet(tr, taskBucket, taskFuture, task)); - - return Void(); + co_await onSet(tr, taskBucket, taskFuture, task); } - ACTOR static Future onSetAddTask(Reference tr, - Reference taskBucket, - Reference taskFuture, - Reference task, - KeyRef validationKey) { + static Future onSetAddTask(Reference tr, + Reference taskBucket, + Reference taskFuture, + Reference task, + KeyRef validationKey) { taskFuture->futureBucket->setOptions(tr); - Optional validationValue = wait(tr->get(validationKey)); + Optional validationValue = co_await tr->get(validationKey); if (!validationValue.present()) { TraceEvent(SevError, "TaskBucketOnSetAddTaskInvalidKey") @@ -1238,9 +1221,7 @@ class TaskFutureImpl { task->params[Task::reservedTaskParamValidKey] = validationKey; task->params[Task::reservedTaskParamValidValue] = validationValue.get(); - wait(onSetAddTask(tr, taskBucket, taskFuture, task)); - - return Void(); + co_await onSetAddTask(tr, taskBucket, taskFuture, task); } static Future onSetAddTask(Reference tr, @@ -1257,18 +1238,18 @@ class TaskFutureImpl { return onSetAddTask(tr, taskBucket, taskFuture, task); } - ACTOR static Future> joinedFuture(Reference tr, - Reference taskBucket, - Reference taskFuture) { + static Future> joinedFuture(Reference tr, + Reference taskBucket, + Reference taskFuture) { taskFuture->futureBucket->setOptions(tr); std::vector> vectorFuture; // the next line means generate a new task future with different key, // but share the same prefix of futureBucket with the input taskFuture - state Reference future = taskFuture->futureBucket->future(tr); + Reference future = taskFuture->futureBucket->future(tr); vectorFuture.push_back(future); - wait(join(tr, taskBucket, taskFuture, vectorFuture)); - return future; + co_await join(tr, taskBucket, taskFuture, vectorFuture); + co_return future; } }; @@ -1341,11 +1322,11 @@ Future> TaskFuture::joinedFuture(Reference::addRef(this)); } -ACTOR Future getCompletionKey(TaskCompletionKey* self, Future> f) { - Reference taskFuture = wait(f); +Future getCompletionKey(TaskCompletionKey* self, Future> f) { + Reference taskFuture = co_await f; self->joinFuture.clear(); self->key = taskFuture->key; - return self->key.get(); + co_return self->key.get(); } Future TaskCompletionKey::get(Reference tr, Reference taskBucket) {