Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/design/transfer-engine/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ If a connection fails due to link errors, it is removed from the
endpoint pools on both sides and re-established during the
next data transfer attempt.

Evicted and deleted endpoints are moved to an internal `waiting_list_` and reclaimed asynchronously once their outstanding slices drain. Reclaim runs on every new endpoint insertion, and additionally on a ~1 Hz heartbeat from the per-context `monitorWorker`, so the waiting list drains even under failure load where new insertions stall while evictions continue.

### Fault Handling
In a multi-NIC environment, one common failure scenario is the temporary unavailability of a specific NIC, while other routes may still connect two nodes.
Mooncake Store is designed to adeptly manage such temporary
Expand Down
3 changes: 2 additions & 1 deletion docs/source/troubleshooting/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ Errors in this part usually indicate that the error occurred within the `mooncak
* hard memlock unlimited
```

7. If the error `Failed to create QP: Cannot allocate memory` is displayed, it is typically caused by too many QP have been created, reaching the driver limit. You can use `rdma resource` to trace how many QP is created. One possible way to resolve this issue:
7. If the error `Failed to create QP: Cannot allocate memory` is displayed, it is typically caused by too many QP have been created, reaching the driver limit. You can use `rdma resource` to trace how many QP is created. Possible ways to resolve this issue:
- Update Mooncake to version v0.3.5 or later
- Set the environment variable `MC_ENABLE_DEST_DEVICE_AFFINITY=1` before starting the application
- If the leak persists under sustained peer failures (many `endpoint evicted` log lines accompanying the QP growth), update to a version that includes the fix for [issue #1845](https://github.com/kvcache-ai/Mooncake/issues/1845). Prior to that fix, the endpoint store's `waiting_list_` only drained when new endpoints were inserted, so evictions under failure load accumulated QPs until the driver limit was hit. The fix adds a periodic reclaim tick to `monitorWorker`.

## RDMA Transfer Period
### Recommended Troubleshooting Directions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class EndpointStore {
const std::string &peer_nic_path, RdmaContext *context) = 0;
virtual int deleteEndpoint(const std::string &peer_nic_path) = 0;
virtual void evictEndpoint() = 0;
// Takes endpoint_map_lock_; caller must not hold it (RWSpinlock is
// non-reentrant, so recursive acquisition deadlocks).
virtual void reclaimEndpoint() = 0;
virtual size_t getSize() = 0;

Expand All @@ -50,12 +52,21 @@ class EndpointStore {

// Get the total number of QPs across all endpoints
virtual size_t getTotalQPNumber() = 0;

// Number of endpoints awaiting reclaim (evicted or explicitly deleted but
// not yet destructed). Exposed for tests and for operator observability.
virtual size_t waitingListSize() const = 0;

// Test-only: push a pre-constructed endpoint into waiting_list_ so reclaim
// logic can be exercised without standing up an RDMA device.
virtual void testOnlyInsertWaiting(std::shared_ptr<RdmaEndPoint> ep) = 0;
};

// FIFO
class FIFOEndpointStore : public EndpointStore {
public:
FIFOEndpointStore(size_t max_size) : max_size_(max_size) {}
FIFOEndpointStore(size_t max_size)
: waiting_list_len_(0), max_size_(max_size) {}
std::shared_ptr<RdmaEndPoint> getEndpoint(
const std::string &peer_nic_path) override;
std::shared_ptr<RdmaEndPoint> insertEndpoint(
Expand All @@ -69,6 +80,11 @@ class FIFOEndpointStore : public EndpointStore {
int disconnectQPs() override;

size_t getTotalQPNumber() override;
size_t waitingListSize() const override {
return waiting_list_len_.load(std::memory_order_relaxed);
}
Comment on lines +83 to +85

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiting_list_len_ is declared as atomic but waitingListSize() returns size_t.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I promoted the counter to atomic<size_t in both FIFO and SIEVE. This is now a clean pass through getter.


void testOnlyInsertWaiting(std::shared_ptr<RdmaEndPoint> ep) override;

private:
RWSpinlock endpoint_map_lock_;
Expand All @@ -78,6 +94,7 @@ class FIFOEndpointStore : public EndpointStore {
std::list<std::string> fifo_list_;

std::unordered_set<std::shared_ptr<RdmaEndPoint>> waiting_list_;
std::atomic<size_t> waiting_list_len_;

size_t max_size_;
};
Expand All @@ -100,6 +117,11 @@ class SIEVEEndpointStore : public EndpointStore {
int disconnectQPs() override;

size_t getTotalQPNumber() override;
size_t waitingListSize() const override {
return waiting_list_len_.load(std::memory_order_relaxed);
}

void testOnlyInsertWaiting(std::shared_ptr<RdmaEndPoint> ep) override;

private:
RWSpinlock endpoint_map_lock_;
Expand All @@ -113,7 +135,7 @@ class SIEVEEndpointStore : public EndpointStore {
std::optional<std::list<std::string>::iterator> hand_;

std::unordered_set<std::shared_ptr<RdmaEndPoint>> waiting_list_;
std::atomic<int> waiting_list_len_;
std::atomic<size_t> waiting_list_len_;

size_t max_size_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ class RdmaContext {

int deleteEndpoint(const std::string &peer_nic_path);

// Drain the endpoint store's waiting list. Safe to call on any thread;
// intended to be invoked periodically from monitorWorker so reclaim is
// not gated on new endpoint insertions (which can stall under failure
// load while evictions/deletions continue). See issue #1845.
void reclaimEndpoints();

// Number of endpoints awaiting reclaim. For tests and operator
// observability.
size_t waitingListSize() const;

// Test-only: push a pre-constructed endpoint into the store's
// waiting_list_ so the reclaim path can be exercised without standing up
// a real RDMA QP.
void testOnlyInsertWaiting(std::shared_ptr<RdmaEndPoint> ep);

int disconnectAllEndpoints();

// Get the total number of QPs across all endpoints in this context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ int FIFOEndpointStore::deleteEndpoint(const std::string &peer_nic_path) {
// remove endpoint but leaving it status unchanged
// in case it is setting up connection or submitting slice
if (iter != endpoint_map_.end()) {
waiting_list_len_++;
waiting_list_.insert(iter->second);
iter->second->set_active(false);
endpoint_map_.erase(iter);
Expand All @@ -86,17 +87,20 @@ void FIFOEndpointStore::evictEndpoint() {
fifo_list_.pop_front();
fifo_map_.erase(victim);
LOG(INFO) << victim << " evicted";
waiting_list_len_++;
waiting_list_.insert(endpoint_map_[victim]);
endpoint_map_.erase(victim);
return;
}

void FIFOEndpointStore::reclaimEndpoint() {
if (waiting_list_len_.load(std::memory_order_relaxed) == 0) return;
RWSpinlock::WriteGuard guard(endpoint_map_lock_);
std::vector<std::shared_ptr<RdmaEndPoint>> to_delete;
for (auto &endpoint : waiting_list_)
if (!endpoint->hasOutstandingSlice()) to_delete.push_back(endpoint);
for (auto &endpoint : to_delete) waiting_list_.erase(endpoint);
waiting_list_len_ -= to_delete.size();
}

size_t FIFOEndpointStore::getSize() { return endpoint_map_.size(); }
Expand Down Expand Up @@ -124,6 +128,13 @@ size_t FIFOEndpointStore::getTotalQPNumber() {
return total_qps;
}

void FIFOEndpointStore::testOnlyInsertWaiting(
std::shared_ptr<RdmaEndPoint> ep) {
RWSpinlock::WriteGuard guard(endpoint_map_lock_);
waiting_list_.insert(ep);
waiting_list_len_++;
}

std::shared_ptr<RdmaEndPoint> SIEVEEndpointStore::getEndpoint(
const std::string &peer_nic_path) {
RWSpinlock::ReadGuard guard(endpoint_map_lock_);
Expand Down Expand Up @@ -240,6 +251,13 @@ int SIEVEEndpointStore::disconnectQPs() {

size_t SIEVEEndpointStore::getSize() { return endpoint_map_.size(); }

void SIEVEEndpointStore::testOnlyInsertWaiting(
std::shared_ptr<RdmaEndPoint> ep) {
RWSpinlock::WriteGuard guard(endpoint_map_lock_);
waiting_list_.insert(ep);
waiting_list_len_++;
}

size_t SIEVEEndpointStore::getTotalQPNumber() {
RWSpinlock::ReadGuard guard(endpoint_map_lock_);
size_t total_qps = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,16 @@ int RdmaContext::deleteEndpoint(const std::string &peer_nic_path) {
return endpoint_store_->deleteEndpoint(peer_nic_path);
}

void RdmaContext::reclaimEndpoints() { endpoint_store_->reclaimEndpoint(); }

size_t RdmaContext::waitingListSize() const {
return endpoint_store_->waitingListSize();
}

void RdmaContext::testOnlyInsertWaiting(std::shared_ptr<RdmaEndPoint> ep) {
endpoint_store_->testOnlyInsertWaiting(std::move(ep));
}

size_t RdmaContext::getTotalQPNumber() const {
return endpoint_store_->getTotalQPNumber();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,11 @@ void WorkerPool::monitorWorker() {
auto current_ts = getCurrentTimeInNano();
if (current_ts - last_reset_ts > 1000000000ll) {
context_.set_active(true);
// Drain endpoint_store_->waiting_list_ even when no new
// insertions are happening. Without this, reclaim only runs
// from RdmaContext::endpoint() and the waiting list grows
// unboundedly under failure load. See issue #1845.
context_.reclaimEndpoints();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously reclaimEndpoint() was only called from within insertEndpoint(), which held endpoint_map_lock_. Now it would be called without the lock, please confirm reclaimEndpoint()'s lock contract hasn't changed in behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reclaimEndpoint() lock contract has not changed behavior:

  • reclaimEndpoint() still self-locks using WriteGuard(endpoint_map_lock_).
  • The existing caller rdma_context.cpp:355-356 already had the insertEndpoint lock freed by its destructor.
  • The new caller in monitorWorker() observes the same contract as it holds no EndpointStore locks.
  • insertEndpoint() definitions in FIFO, SIEVE, and UB self-lock and only call evictEndpoint(), which is caller-locked. None call reclaimEndpoint().

I added a comment to make this contract clear to future contributors.

last_reset_ts = current_ts;
}
struct epoll_event event;
Expand Down
11 changes: 11 additions & 0 deletions mooncake-transfer-engine/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ add_executable(transport_uint_test ${WORKSPACE}/transport_uint_test.cpp)
target_link_libraries(transport_uint_test PUBLIC transfer_engine gtest gtest_main )
add_test(NAME transport_uint_test COMMAND transport_uint_test)

add_executable(endpoint_store_test ${WORKSPACE}/endpoint_store_test.cpp)
target_link_libraries(endpoint_store_test PUBLIC transfer_engine gtest gtest_main)
add_test(NAME endpoint_store_test COMMAND endpoint_store_test)

# Integration test for the monitorWorker reclaim tick (issue #1845).
# Self-skips when no RDMA device is present, so safe to register with ctest.
add_executable(endpoint_store_integration_test ${WORKSPACE}/endpoint_store_integration_test.cpp)
target_link_libraries(endpoint_store_integration_test PUBLIC transfer_engine gtest gtest_main)
add_test(NAME endpoint_store_integration_test COMMAND endpoint_store_integration_test)
set_tests_properties(endpoint_store_integration_test PROPERTIES LABELS "rdma")

add_executable(rdma_transport_test2 ${WORKSPACE}/rdma_transport_test2.cpp)
target_link_libraries(rdma_transport_test2 PUBLIC transfer_engine gtest gtest_main )
# add_test(NAME rdma_transport_test2 COMMAND rdma_transport_test2)
Expand Down
124 changes: 124 additions & 0 deletions mooncake-transfer-engine/tests/endpoint_store_integration_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2026 KVCache.AI
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Integration test for #1845. Verifies the end-to-end wiring of the fix:
// WorkerPool::monitorWorker actually calls RdmaContext::reclaimEndpoints at
// ~1 Hz, causing quiescent entries in the endpoint store's waiting_list_ to
// drain without any further insertion traffic. The unit tests in
// endpoint_store_test.cpp verify the reclaim method itself; this file
// verifies that the scheduler invokes it.
//
// Requires an RDMA device. Passes on soft-RoCE (`rdma_rxe`) as well as real
// NICs. Self-skips (GTEST_SKIP) when no device is present, so it is safe to
// register with ctest on CI runners without RDMA.
//
// Environment override: set MC_TEST_DEVICE_NAME to force a specific device;
// otherwise the first device returned by ibv_get_device_list is used.

#include <gtest/gtest.h>
#include <infiniband/verbs.h>

#include <chrono>
#include <cstdlib>
#include <memory>
#include <string>
#include <thread>

#include "config.h"
#include "transport/rdma_transport/endpoint_store.h"
#include "transport/rdma_transport/rdma_context.h"
#include "transport/rdma_transport/rdma_endpoint.h"
#include "transport/rdma_transport/rdma_transport.h"

#if defined(__has_feature)
#define MC_HAS_FEATURE(x) __has_feature(x)
#else
#define MC_HAS_FEATURE(x) 0
#endif
#if defined(__SANITIZE_ADDRESS__) || MC_HAS_FEATURE(address_sanitizer)
#include <sanitizer/lsan_interface.h>
#define MC_LSAN_IGNORE_OBJECT(p) __lsan_ignore_object(p)
#else
#define MC_LSAN_IGNORE_OBJECT(p) ((void)(p))
#endif

using namespace mooncake;

namespace {

std::string pickRdmaDevice() {
const char *override_name = std::getenv("MC_TEST_DEVICE_NAME");
if (override_name && *override_name) return override_name;
int num_devices = 0;
ibv_device **list = ibv_get_device_list(&num_devices);
if (!list || num_devices == 0) return "";
std::string name = ibv_get_device_name(list[0]);
ibv_free_device_list(list);
return name;
}

// Build an RdmaEndPoint with no QPs and active_=false. The store's reclaim
// path only inspects hasOutstandingSlice(), which for an endpoint with empty
// qp_list_ reduces to !active_. Safe to destruct because qp_list_ is empty.
std::shared_ptr<RdmaEndPoint> makeQuiescentEndpoint(RdmaContext &ctx) {
auto ep = std::make_shared<RdmaEndPoint>(ctx);
ep->set_active(false);
return ep;
}

// Verifies the full fix wiring: after construct() spawns monitorWorker, a
// quiescent entry injected into the store's waiting_list_ is drained by the
// scheduler within ~1.5 s with no further insertion traffic.
TEST(EndpointStoreIntegration, MonitorWorkerTickDrainsWaitingList) {
const std::string device = pickRdmaDevice();
if (device.empty()) {
GTEST_SKIP() << "no RDMA device available — integration test requires "
"rxe0, mlx5, or similar. Set MC_TEST_DEVICE_NAME to "
"override.";
}

// RdmaTransport's destructor dereferences metadata_ which is null until
// init(); leak the engine to avoid touching that path. Marked ignored so
// LSAN under ASAN builds doesn't flag this intentional leak.
auto *transport = new RdmaTransport();
MC_LSAN_IGNORE_OBJECT(transport);
auto context = std::make_shared<RdmaContext>(*transport, device);
auto &config = globalConfig();
int rc = context->construct(config.num_cq_per_ctx,
config.num_comp_channels_per_ctx, config.port,
config.gid_index, config.max_cqe,
/*max_endpoints=*/4);
if (rc != 0) {
GTEST_SKIP() << "RdmaContext::construct failed on device " << device
<< " (rc=" << rc << "); no usable RDMA device on this "
<< "host (e.g., CI runners may enumerate a phantom "
<< "mlx5_0 without a working port).";
}

context->testOnlyInsertWaiting(makeQuiescentEndpoint(*context));
context->testOnlyInsertWaiting(makeQuiescentEndpoint(*context));
context->testOnlyInsertWaiting(makeQuiescentEndpoint(*context));
ASSERT_EQ(context->waitingListSize(), 3u);

// monitorWorker's reclaim tick fires every ~1 s. Give it enough margin
// for scheduling jitter but keep the test fast.
std::this_thread::sleep_for(std::chrono::milliseconds(1500));

EXPECT_EQ(context->waitingListSize(), 0u)
<< "monitorWorker must call reclaimEndpoints within ~1 s. If this "
"fails, either the periodic tick in worker_pool.cpp was removed or "
"reclaim is failing on quiescent entries.";
}

} // namespace
Loading
Loading