diff --git a/kv_cache_manager/manager/cache_reclaimer.cc b/kv_cache_manager/manager/cache_reclaimer.cc index e50bfb11..693fded4 100644 --- a/kv_cache_manager/manager/cache_reclaimer.cc +++ b/kv_cache_manager/manager/cache_reclaimer.cc @@ -966,7 +966,7 @@ void CacheReclaimer::HandleDelRes() noexcept { it = delete_handlers_.erase_after(it_pre); } else if (const auto fs = it->fut_.wait_for(std::chrono::seconds::zero()); fs == std::future_status::ready) { try { - if (const auto [ec, err_msg] = it->fut_.get(); ec != ErrorCode::EC_OK) { + if (const auto [ec, err_msg, _] = it->fut_.get(); ec != ErrorCode::EC_OK) { LOG_WITH_ID(WARN, "reclaim request execute failed, error_code: [%d], error message: [%s]", static_cast(ec), diff --git a/kv_cache_manager/manager/reclaimer_task_supervisor.cc b/kv_cache_manager/manager/reclaimer_task_supervisor.cc index eed00e34..aad03da1 100644 --- a/kv_cache_manager/manager/reclaimer_task_supervisor.cc +++ b/kv_cache_manager/manager/reclaimer_task_supervisor.cc @@ -61,11 +61,26 @@ void ReclaimerTaskSupervisor::WorkLoop() { auto status = cell->result.wait_for(kDefaultFutureWaitTime); if (status == std::future_status::ready) { auto del_result = cell->result.get(); - KVCM_LOG_INFO("delete task finish : instance_id[%s] trace_id [%s] ec[%d] message[%s]", - cell->instance_id.c_str(), - cell->trace_id.c_str(), - del_result.status, - del_result.error_message.c_str()); + if (del_result.status != ErrorCode::EC_OK) { + // retry + CacheLocationDelRequest request; + request.instance_id = cell->instance_id; + request.delay = std::chrono::seconds(0); + for (const auto &meta : del_result.fail_metas) { + request.block_keys.push_back(meta.block_key); + request.location_ids.push_back(meta.location_ids); + } + cell->result = schedule_plan_executor_->Submit(request); + if (cell->result.valid()) { + cell_queue_.Push(cell); + } + } else { + KVCM_LOG_INFO("delete task finish : instance_id[%s] trace_id [%s] ec[%d] message[%s]", + cell->instance_id.c_str(), + cell->trace_id.c_str(), + del_result.status, + del_result.error_message.c_str()); + } } else { cell_queue_.Push(cell); } diff --git a/kv_cache_manager/manager/schedule_plan_executor.cc b/kv_cache_manager/manager/schedule_plan_executor.cc index 8dceea98..cac43a86 100644 --- a/kv_cache_manager/manager/schedule_plan_executor.cc +++ b/kv_cache_manager/manager/schedule_plan_executor.cc @@ -32,6 +32,7 @@ void HandleErrorPromise(const std::shared_ptr> &promise promise->set_value(ResultType{ .status = error_code, .error_message = std::move(error_message), + .fail_metas = {}, }); } } // namespace @@ -210,10 +211,14 @@ void SchedulePlanExecutor::DoLocationDelTask(const std::shared_ptr status_vec; + std::vector location_ids; for (size_t block_key_idx = 0; block_key_idx < delete_meta_results.size(); ++block_key_idx) { auto &results = delete_meta_results[block_key_idx]; for (size_t location_idx = 0; location_idx < results.size(); location_idx++) { if (results[location_idx] != ErrorCode::EC_OK) { + status_vec.push_back(results[location_idx]); + location_ids.push_back(batch_cad_tasks[block_key_idx][location_idx].location_id); result.status = ErrorCode::EC_PARTIAL_OK; KVCM_LOG_WARN("Failed to CAD meta key %ld, location: %s, error_code: %d", block_keys_to_delete[block_key_idx], @@ -221,6 +226,12 @@ void SchedulePlanExecutor::DoLocationDelTask(const std::shared_ptr SchedulePlanExecutor::Submit(const CacheMetaDelRe } if (batch_cas_block_keys.empty()) { - promise->set_value({ErrorCode::EC_OK, ""}); + promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, "", {}}); return future; } @@ -310,7 +321,7 @@ std::future SchedulePlanExecutor::Submit(const CacheMetaDelRe return future; } if (actual_task.block_keys.empty()) { - promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, ""}); + promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, "", {}}); return future; } KVCM_LOG_DEBUG("Location statuses updated, submitting task to worker pool with delay: %lld microseconds", @@ -416,7 +427,7 @@ std::future SchedulePlanExecutor::Submit(const CacheLocationD batch_cas_tasks.emplace_back(std::move(location_cas_tasks)); } if (batch_cas_block_keys.empty()) { - promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, ""}); + promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, "", {}}); return future; } @@ -434,7 +445,7 @@ std::future SchedulePlanExecutor::Submit(const CacheLocationD return future; } if (actual_task.block_keys.empty()) { - promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, ""}); + promise->set_value(PlanExecuteResult{ErrorCode::EC_OK, "", {}}); return future; } diff --git a/kv_cache_manager/manager/schedule_plan_executor.h b/kv_cache_manager/manager/schedule_plan_executor.h index 712ec328..66e2ebd8 100644 --- a/kv_cache_manager/manager/schedule_plan_executor.h +++ b/kv_cache_manager/manager/schedule_plan_executor.h @@ -36,9 +36,16 @@ struct CacheMetaDelRequest { std::chrono::microseconds delay{std::chrono::seconds(0)}; }; +struct PlanExecuteResultFailMeta { + int64_t block_key; + std::vector status_vec; + std::vector location_ids; +}; + struct PlanExecuteResult { ErrorCode status; std::string error_message; + std::vector fail_metas; }; struct CacheLocationDelRequest {