Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kv_cache_manager/manager/cache_reclaimer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ void CacheReclaimer::HandleDelRes() noexcept {
it = delete_handlers_.erase_after(it_pre);
} else if (const auto fs = it->fut_.wait_for(std::chrono::seconds::zero()); fs == std::future_status::ready) {
try {
if (const auto [ec, err_msg] = it->fut_.get(); ec != ErrorCode::EC_OK) {
if (const auto [ec, err_msg, _] = it->fut_.get(); ec != ErrorCode::EC_OK) {
LOG_WITH_ID(WARN,
"reclaim request execute failed, error_code: [%d], error message: [%s]",
static_cast<std::int32_t>(ec),
Expand Down
25 changes: 20 additions & 5 deletions kv_cache_manager/manager/reclaimer_task_supervisor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,26 @@ void ReclaimerTaskSupervisor::WorkLoop() {
auto status = cell->result.wait_for(kDefaultFutureWaitTime);
if (status == std::future_status::ready) {
auto del_result = cell->result.get();
KVCM_LOG_INFO("delete task finish : instance_id[%s] trace_id [%s] ec[%d] message[%s]",
cell->instance_id.c_str(),
cell->trace_id.c_str(),
del_result.status,
del_result.error_message.c_str());
if (del_result.status != ErrorCode::EC_OK) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry loop here changes the semantics of delete failures in a couple of important ways that may cause issues in production.

  1. When DoLocationDelTask sets result.status to a non-OK value because of storage delete failures (e.g., Delete returning errors) or early error paths like HandleErrorPromise in the executor, fail_metas can still be empty. In that case this branch builds a CacheLocationDelRequest whose block_keys and location_ids are both empty. The SchedulePlanExecutor::Submit(const CacheLocationDelRequest&) overload treats an empty task as success and immediately sets a PlanExecuteResult{EC_OK, "", {}}. As a result, a real failure in the original delete operation is effectively converted into an apparent success on the retry, and the supervisor logs the task as finished successfully. This hides the original error from operators and makes it hard to understand why deletes sometimes fail.

  2. For true CAD partial failures where fail_metas is populated, this loop will keep resubmitting delete requests as long as del_result.status != EC_OK and Submit continues to return a valid future. There is no retry limit, no backoff, and no distinction between transient and permanent failure codes. If a backend consistently returns non-OK status for those locations, this cell can be requeued indefinitely with zero delay, continually issuing requests and warnings and potentially starving the queue.

Would it make sense to (a) only build a retry request when fail_metas is non-empty, and (b) introduce some explicit retry policy (max attempts or backoff, and possibly short-circuit on clearly permanent error codes) so we don’t turn persistent CAD errors into an unbounded tight loop?


🤖 Generated by QoderFix in Qoder

// retry
CacheLocationDelRequest request;
request.instance_id = cell->instance_id;
request.delay = std::chrono::seconds(0);
for (const auto &meta : del_result.fail_metas) {
request.block_keys.push_back(meta.block_key);
request.location_ids.push_back(meta.location_ids);
}
cell->result = schedule_plan_executor_->Submit(request);
if (cell->result.valid()) {
cell_queue_.Push(cell);
}
} else {
KVCM_LOG_INFO("delete task finish : instance_id[%s] trace_id [%s] ec[%d] message[%s]",
cell->instance_id.c_str(),
cell->trace_id.c_str(),
del_result.status,
del_result.error_message.c_str());
}
} else {
cell_queue_.Push(cell);
}
Expand Down
19 changes: 15 additions & 4 deletions kv_cache_manager/manager/schedule_plan_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ void HandleErrorPromise(const std::shared_ptr<std::promise<ResultType>> &promise
promise->set_value(ResultType{
.status = error_code,
.error_message = std::move(error_message),
.fail_metas = {},
});
}
} // namespace
Expand Down Expand Up @@ -210,17 +211,27 @@ void SchedulePlanExecutor::DoLocationDelTask(const std::shared_ptr<std::promise<
ErrorCode delete_meta_ec =
meta_searcher.BatchCADLocationStatus(ctx.get(), block_keys_to_delete, batch_cad_tasks, delete_meta_results);
(void)delete_meta_ec; // 忽略返回值
std::vector<ErrorCode> status_vec;
std::vector<std::string> location_ids;
for (size_t block_key_idx = 0; block_key_idx < delete_meta_results.size(); ++block_key_idx) {
auto &results = delete_meta_results[block_key_idx];
for (size_t location_idx = 0; location_idx < results.size(); location_idx++) {
if (results[location_idx] != ErrorCode::EC_OK) {
status_vec.push_back(results[location_idx]);
location_ids.push_back(batch_cad_tasks[block_key_idx][location_idx].location_id);
result.status = ErrorCode::EC_PARTIAL_OK;
KVCM_LOG_WARN("Failed to CAD meta key %ld, location: %s, error_code: %d",
block_keys_to_delete[block_key_idx],
batch_cad_tasks[block_key_idx][location_idx].location_id.c_str(),
results[location_idx]);
}
}
if (!status_vec.empty()) {
result.fail_metas.push_back(PlanExecuteResultFailMeta{
block_keys_to_delete[block_key_idx], std::move(status_vec), std::move(location_ids)});
status_vec.clear();
location_ids.clear();
}
}
}
KVCM_LOG_DEBUG("DoDelLocationTask completed successfully for instance_id: %s", task.instance_id.c_str());
Expand Down Expand Up @@ -292,7 +303,7 @@ std::future<PlanExecuteResult> SchedulePlanExecutor::Submit(const CacheMetaDelRe
}

if (batch_cas_block_keys.empty()) {
promise->set_value({ErrorCode::EC_OK, ""});
promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, "", {}});
return future;
}

Expand All @@ -310,7 +321,7 @@ std::future<PlanExecuteResult> SchedulePlanExecutor::Submit(const CacheMetaDelRe
return future;
}
if (actual_task.block_keys.empty()) {
promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, ""});
promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, "", {}});
return future;
}
KVCM_LOG_DEBUG("Location statuses updated, submitting task to worker pool with delay: %lld microseconds",
Expand Down Expand Up @@ -416,7 +427,7 @@ std::future<PlanExecuteResult> SchedulePlanExecutor::Submit(const CacheLocationD
batch_cas_tasks.emplace_back(std::move(location_cas_tasks));
}
if (batch_cas_block_keys.empty()) {
promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, ""});
promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, "", {}});
return future;
}

Expand All @@ -434,7 +445,7 @@ std::future<PlanExecuteResult> SchedulePlanExecutor::Submit(const CacheLocationD
return future;
}
if (actual_task.block_keys.empty()) {
promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, ""});
promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, "", {}});
return future;
}

Expand Down
7 changes: 7 additions & 0 deletions kv_cache_manager/manager/schedule_plan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,16 @@ struct CacheMetaDelRequest {
std::chrono::microseconds delay{std::chrono::seconds(0)};
};

struct PlanExecuteResultFailMeta {
int64_t block_key;
std::vector<ErrorCode> status_vec;
std::vector<std::string> location_ids;
};

struct PlanExecuteResult {
ErrorCode status;
std::string error_message;
std::vector<PlanExecuteResultFailMeta> fail_metas;
};

struct CacheLocationDelRequest {
Expand Down
Loading