Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/design/transfer-engine/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ If a connection fails due to link errors, it is removed from the
endpoint pools on both sides and re-established during the
next data transfer attempt.

Evicted and deleted endpoints are moved to an internal `waiting_list_` and reclaimed asynchronously once their outstanding slices drain. Reclaim runs on every new endpoint insertion, and additionally on a ~1 Hz heartbeat from the per-context `monitorWorker`, so the waiting list drains even under failure load where new insertions stall while evictions continue.

### Fault Handling
In a multi-NIC environment, one common failure scenario is the temporary unavailability of a specific NIC, while other routes may still connect two nodes.
Mooncake Store is designed to adeptly manage such temporary
Expand Down
3 changes: 2 additions & 1 deletion docs/source/troubleshooting/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ Errors in this part usually indicate that the error occurred within the `mooncak
* hard memlock unlimited
```

7. If the error `Failed to create QP: Cannot allocate memory` is displayed, it is typically caused by too many QP have been created, reaching the driver limit. You can use `rdma resource` to trace how many QP is created. One possible way to resolve this issue:
7. If the error `Failed to create QP: Cannot allocate memory` is displayed, it is typically caused by too many QP have been created, reaching the driver limit. You can use `rdma resource` to trace how many QP is created. Possible ways to resolve this issue:
- Update Mooncake to version v0.3.5 or later
- Set the environment variable `MC_ENABLE_DEST_DEVICE_AFFINITY=1` before starting the application
- If the leak persists under sustained peer failures (many `endpoint evicted` log lines accompanying the QP growth), update to a version that includes the fix for [issue #1845](https://github.com/kvcache-ai/Mooncake/issues/1845). Prior to that fix, the endpoint store's `waiting_list_` only drained when new endpoints were inserted, so evictions under failure load accumulated QPs until the driver limit was hit. The fix adds a periodic reclaim tick to `monitorWorker`.

## RDMA Transfer Period
### Recommended Troubleshooting Directions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class EndpointStore {

// Get the total number of QPs across all endpoints
virtual size_t getTotalQPNumber() = 0;

// Number of endpoints awaiting reclaim (evicted or explicitly deleted but
// not yet destructed). Exposed for tests and for operator observability.
virtual size_t waitingListSize() const = 0;
};

// FIFO
Expand All @@ -69,6 +73,7 @@ class FIFOEndpointStore : public EndpointStore {
int disconnectQPs() override;

size_t getTotalQPNumber() override;
size_t waitingListSize() const override { return waiting_list_.size(); }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The implementation of waitingListSize() in FIFOEndpointStore is not thread-safe. Accessing waiting_list_.size() on a std::unordered_set without a lock while other threads might be concurrently modifying the set (e.g., in deleteEndpoint, evictEndpoint, or reclaimEndpoint) leads to undefined behavior.

To resolve this, consider adding an atomic counter waiting_list_len_ to FIFOEndpointStore, similar to the implementation in SIEVEEndpointStore. This would allow for a lock-free and thread-safe size check. Alternatively, you could use a ReadGuard with the endpoint_map_lock_, though this would require making the lock mutable to be used within this const method.


private:
RWSpinlock endpoint_map_lock_;
Expand Down Expand Up @@ -100,6 +105,13 @@ class SIEVEEndpointStore : public EndpointStore {
int disconnectQPs() override;

size_t getTotalQPNumber() override;
size_t waitingListSize() const override {
return waiting_list_len_.load(std::memory_order_relaxed);
}

// Test-only: push a pre-constructed endpoint into waiting_list_ so reclaim
// logic can be exercised without standing up an RDMA device.
void testOnlyInsertWaiting(std::shared_ptr<RdmaEndPoint> ep);

private:
RWSpinlock endpoint_map_lock_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ class RdmaContext {

int deleteEndpoint(const std::string &peer_nic_path);

// Drain the endpoint store's waiting list. Safe to call on any thread;
// intended to be invoked periodically from monitorWorker so reclaim is
// not gated on new endpoint insertions (which can stall under failure
// load while evictions/deletions continue). See issue #1845.
void reclaimEndpoints();

// Raw accessor for the endpoint cache. Null before construct() runs.
// Used by integration tests that need to observe waiting_list_ directly.
EndpointStore *endpointStore() const { return endpoint_store_.get(); }

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This func would expose a raw point to EndpointStore*. If this were to be called for other purposes in the future (not just for testing), would it be safe?

@ccs1112 ccs1112 Apr 24, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I had some concerns about this as well. I decided to remove this pointer and only add the necessary methods to RdmaContext: waitingListSize() and testOnlyInsertWaiting(std::shared_ptr<RdmaEndPoint> ep).

  • waitingListSize() returns by value, so there is no pointer to stash.
  • testOnlyInsertWaiting() takes a shared_ptr by value. This removes the raw pointer access across the API boundary.
  • testOnlyInsertWaiting() is lifted as a virtual function on the EndpointStore base interface. This has the added benefit of making the integration test no longer need dynamic_cast to SIEVEEndpointStore.


int disconnectAllEndpoints();

// Get the total number of QPs across all endpoints in this context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ int SIEVEEndpointStore::disconnectQPs() {

size_t SIEVEEndpointStore::getSize() { return endpoint_map_.size(); }

void SIEVEEndpointStore::testOnlyInsertWaiting(
std::shared_ptr<RdmaEndPoint> ep) {
RWSpinlock::WriteGuard guard(endpoint_map_lock_);
waiting_list_.insert(ep);
waiting_list_len_++;
}

size_t SIEVEEndpointStore::getTotalQPNumber() {
RWSpinlock::ReadGuard guard(endpoint_map_lock_);
size_t total_qps = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ int RdmaContext::deleteEndpoint(const std::string &peer_nic_path) {
return endpoint_store_->deleteEndpoint(peer_nic_path);
}

void RdmaContext::reclaimEndpoints() { endpoint_store_->reclaimEndpoint(); }

size_t RdmaContext::getTotalQPNumber() const {
return endpoint_store_->getTotalQPNumber();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,11 @@ void WorkerPool::monitorWorker() {
auto current_ts = getCurrentTimeInNano();
if (current_ts - last_reset_ts > 1000000000ll) {
context_.set_active(true);
// Drain endpoint_store_->waiting_list_ even when no new
// insertions are happening. Without this, reclaim only runs
// from RdmaContext::endpoint() and the waiting list grows
// unboundedly under failure load. See issue #1845.
context_.reclaimEndpoints();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously reclaimEndpoint() was only called from within insertEndpoint(), which held endpoint_map_lock_. Now it would be called without the lock, please confirm reclaimEndpoint()'s lock contract hasn't changed in behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reclaimEndpoint() lock contract has not changed behavior:

  • reclaimEndpoint() still self-locks using WriteGuard(endpoint_map_lock_).
  • The existing caller rdma_context.cpp:355-356 already had the insertEndpoint lock freed by its destructor.
  • The new caller in monitorWorker() observes the same contract as it holds no EndpointStore locks.
  • insertEndpoint() definitions in FIFO, SIEVE, and UB self-lock and only call evictEndpoint(), which is caller-locked. None call reclaimEndpoint().

I added a comment to make this contract clear to future contributors.

last_reset_ts = current_ts;
}
struct epoll_event event;
Expand Down
10 changes: 10 additions & 0 deletions mooncake-transfer-engine/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ add_executable(transport_uint_test ${WORKSPACE}/transport_uint_test.cpp)
target_link_libraries(transport_uint_test PUBLIC transfer_engine gtest gtest_main )
add_test(NAME transport_uint_test COMMAND transport_uint_test)

add_executable(endpoint_store_test ${WORKSPACE}/endpoint_store_test.cpp)
target_link_libraries(endpoint_store_test PUBLIC transfer_engine gtest gtest_main)
add_test(NAME endpoint_store_test COMMAND endpoint_store_test)

# Integration test for the monitorWorker reclaim tick (issue #1845).
# Requires an RDMA device (rxe0, mlx5, etc.) — invoked manually, not via ctest.
add_executable(endpoint_store_integration_test ${WORKSPACE}/endpoint_store_integration_test.cpp)
target_link_libraries(endpoint_store_integration_test PUBLIC transfer_engine gtest gtest_main)
# add_test(NAME endpoint_store_integration_test COMMAND endpoint_store_integration_test)

add_executable(rdma_transport_test2 ${WORKSPACE}/rdma_transport_test2.cpp)
target_link_libraries(rdma_transport_test2 PUBLIC transfer_engine gtest gtest_main )
# add_test(NAME rdma_transport_test2 COMMAND rdma_transport_test2)
Expand Down
111 changes: 111 additions & 0 deletions mooncake-transfer-engine/tests/endpoint_store_integration_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2026 KVCache.AI
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Integration test for #1845. Verifies the end-to-end wiring of the fix:
// WorkerPool::monitorWorker actually calls RdmaContext::reclaimEndpoints at
// ~1 Hz, causing quiescent entries in the endpoint store's waiting_list_ to
// drain without any further insertion traffic. The unit tests in
// endpoint_store_test.cpp verify the reclaim method itself; this file
// verifies that the scheduler invokes it.
//
// Requires an RDMA device. Passes on soft-RoCE (`rdma_rxe`) as well as real
// NICs. Not registered with ctest by default because CI runners don't have
// devices; invoke manually:
//
// ./build/mooncake-transfer-engine/tests/endpoint_store_integration_test
//
// Environment override: set MC_TEST_DEVICE_NAME to force a specific device;
// otherwise the first device returned by ibv_get_device_list is used.

#include <gtest/gtest.h>
#include <infiniband/verbs.h>

#include <chrono>
#include <cstdlib>
#include <memory>
#include <string>
#include <thread>

#include "config.h"
#include "transport/rdma_transport/endpoint_store.h"
#include "transport/rdma_transport/rdma_context.h"
#include "transport/rdma_transport/rdma_endpoint.h"
#include "transport/rdma_transport/rdma_transport.h"

using namespace mooncake;

namespace {

std::string pickRdmaDevice() {
const char *override_name = std::getenv("MC_TEST_DEVICE_NAME");
if (override_name && *override_name) return override_name;
int num_devices = 0;
ibv_device **list = ibv_get_device_list(&num_devices);
if (!list || num_devices == 0) return "";
std::string name = ibv_get_device_name(list[0]);
ibv_free_device_list(list);
return name;
}

// Build an RdmaEndPoint with no QPs and active_=false. The store's reclaim
// path only inspects hasOutstandingSlice(), which for an endpoint with empty
// qp_list_ reduces to !active_. Safe to destruct because qp_list_ is empty.
std::shared_ptr<RdmaEndPoint> makeQuiescentEndpoint(RdmaContext &ctx) {
auto ep = std::make_shared<RdmaEndPoint>(ctx);
ep->set_active(false);
return ep;
}

// Verifies the full fix wiring: after construct() spawns monitorWorker, a
// quiescent entry injected into the store's waiting_list_ is drained by the
// scheduler within ~1.5 s with no further insertion traffic.
TEST(EndpointStoreIntegration, MonitorWorkerTickDrainsWaitingList) {
const std::string device = pickRdmaDevice();
ASSERT_FALSE(device.empty())
<< "no RDMA device available — integration test requires rxe0, mlx5, "
"or similar. Set MC_TEST_DEVICE_NAME to override.";

// RdmaTransport's destructor dereferences metadata_ which is null until
// init(); leak the engine to avoid touching that path.
auto *transport = new RdmaTransport();
auto context = std::make_shared<RdmaContext>(*transport, device);
auto &config = globalConfig();
int rc = context->construct(config.num_cq_per_ctx,
config.num_comp_channels_per_ctx, config.port,
config.gid_index, config.max_cqe,
/*max_endpoints=*/4);
ASSERT_EQ(rc, 0) << "RdmaContext::construct failed on device " << device;

auto *raw_store = context->endpointStore();
ASSERT_NE(raw_store, nullptr);
auto *sieve = dynamic_cast<SIEVEEndpointStore *>(raw_store);
ASSERT_NE(sieve, nullptr) << "default endpoint store should be SIEVE; "
"integration test assumes it";

sieve->testOnlyInsertWaiting(makeQuiescentEndpoint(*context));
sieve->testOnlyInsertWaiting(makeQuiescentEndpoint(*context));
sieve->testOnlyInsertWaiting(makeQuiescentEndpoint(*context));
ASSERT_EQ(sieve->waitingListSize(), 3u);

// monitorWorker's reclaim tick fires every ~1 s. Give it enough margin
// for scheduling jitter but keep the test fast.
std::this_thread::sleep_for(std::chrono::milliseconds(1500));

EXPECT_EQ(sieve->waitingListSize(), 0u)
<< "monitorWorker must call reclaimEndpoints within ~1 s. If this "
"fails, either the periodic tick in worker_pool.cpp was removed or "
"reclaim is failing on quiescent entries.";
}

} // namespace
161 changes: 161 additions & 0 deletions mooncake-transfer-engine/tests/endpoint_store_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2026 KVCache.AI
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Regression coverage for #1845. Asserts that
// SIEVEEndpointStore::reclaimEndpoint drains quiescent entries from
// waiting_list_ without requiring a subsequent insertEndpoint call. This is the
// invariant the periodic-reclaim tick in monitorWorker depends on.

#include <gtest/gtest.h>

#include <memory>

#include "transport/rdma_transport/endpoint_store.h"
#include "transport/rdma_transport/rdma_context.h"
#include "transport/rdma_transport/rdma_endpoint.h"
#include "transport/rdma_transport/rdma_transport.h"

using namespace mooncake;

namespace {

// Build an RdmaEndPoint that owns zero QPs and has active_=false. construct()
// is deliberately not called — the store's reclaim logic only inspects
// hasOutstandingSlice(), which for an endpoint with empty qp_list_ returns
// whatever active_ is.
std::shared_ptr<RdmaEndPoint> makeQuiescentEndpoint(RdmaContext& ctx) {
auto ep = std::make_shared<RdmaEndPoint>(ctx);
ep->set_active(false);
return ep;
}

std::shared_ptr<RdmaEndPoint> makeActiveEndpoint(RdmaContext& ctx) {
// Default ctor leaves active_=true.
return std::make_shared<RdmaEndPoint>(ctx);
}

class EndpointStoreTest : public ::testing::Test {
protected:
// Leaked on purpose: RdmaTransport's destructor dereferences metadata_,
// which is null when the engine was never init()ed. We only need a live
// reference for RdmaContext's constructor; the engine object is otherwise
// unused by the reclaim logic under test.
RdmaTransport* transport_ = nullptr;
std::unique_ptr<RdmaContext> ctx_;

void SetUp() override {
transport_ = new RdmaTransport();
ctx_ = std::make_unique<RdmaContext>(*transport_, "unused");
}
};

// The core invariant behind #1845's fix: reclaimEndpoint must drain quiescent
// entries on its own, without needing a subsequent insertEndpoint to trigger
// it. Before the fix, reclaim ran only on insertion, so if insertions stopped
// (e.g., all peers died), waiting_list_ grew unboundedly. The periodic tick
// from monitorWorker calls this method every second; this test asserts its
// contract in isolation.
TEST_F(EndpointStoreTest, ReclaimDrainsQuiescentEntries) {
SIEVEEndpointStore store(/*max_size=*/4);

constexpr size_t kN = 10;
for (size_t i = 0; i < kN; ++i) {
store.testOnlyInsertWaiting(makeQuiescentEndpoint(*ctx_));
}
EXPECT_EQ(store.waitingListSize(), kN);

store.reclaimEndpoint();
EXPECT_EQ(store.waitingListSize(), 0u)
<< "reclaimEndpoint must drain quiescent entries with no insertion "
"prerequisite";
}

// Negative control: reclaim must leave entries in place if they still report
// outstanding slices. Ensures we didn't break the hasOutstandingSlice gate.
TEST_F(EndpointStoreTest, ReclaimLeavesActiveEntries) {
SIEVEEndpointStore store(4);

store.testOnlyInsertWaiting(makeActiveEndpoint(*ctx_));
store.testOnlyInsertWaiting(makeActiveEndpoint(*ctx_));
store.testOnlyInsertWaiting(makeQuiescentEndpoint(*ctx_));
EXPECT_EQ(store.waitingListSize(), 3u);

store.reclaimEndpoint();
EXPECT_EQ(store.waitingListSize(), 2u)
<< "reclaim should drop only the quiescent endpoint, keep the two "
"active ones";
}

TEST_F(EndpointStoreTest, ReclaimIsIdempotentWhenEmpty) {
SIEVEEndpointStore store(4);

store.reclaimEndpoint();
EXPECT_EQ(store.waitingListSize(), 0u);

store.testOnlyInsertWaiting(makeQuiescentEndpoint(*ctx_));
store.reclaimEndpoint();
EXPECT_EQ(store.waitingListSize(), 0u);

store.reclaimEndpoint(); // second call is a no-op
EXPECT_EQ(store.waitingListSize(), 0u);
}

// Demonstrates the #1845 failure mode: once insertions stop but evictions
// keep landing in the waiting list, nothing drains them without an explicit
// reclaim call. Before this fix, reclaimEndpoint ran only from insertEndpoint,
// so "many evictions, no new peers to connect to" meant waiting_list_ grew
// without bound. This test simulates that workload without any RDMA or
// scheduler; the assertion is a strict "zero reclaim calls leaves the leak
// at its peak."
TEST_F(EndpointStoreTest, LeakManifestsWithoutReclaimCall) {
SIEVEEndpointStore store(/*max_size=*/4);

constexpr size_t kEvictions = 1118; // match reporter's eviction count
for (size_t i = 0; i < kEvictions; ++i) {
store.testOnlyInsertWaiting(makeQuiescentEndpoint(*ctx_));
}

// Without a reclaim call the leak is at its peak.
EXPECT_EQ(store.waitingListSize(), kEvictions)
<< "baseline confirmation: waiting_list_ accumulates as expected";

// The fix is a 1 Hz invocation of this single method from monitorWorker.
// One call is enough to drain the entire backlog (because the entries are
// quiescent by the time the peer-death path finishes). This is the
// invariant the PR relies on.
store.reclaimEndpoint();
EXPECT_EQ(store.waitingListSize(), 0u)
<< "a single reclaim call drains the full backlog once insertions "
"stop; this is what the periodic tick in monitorWorker provides";
}

// Guards against a future regression that re-breaks the reclaim contract —
// e.g., someone changing reclaimEndpoint to no-op when endpoint_map_ is
// empty, on the incorrect assumption that reclaim only runs from
// insertEndpoint. Walking 1000 quiescent entries should still drain them.
TEST_F(EndpointStoreTest, ReclaimDoesNotRequireActiveMap) {
SIEVEEndpointStore store(4);
EXPECT_EQ(store.getSize(), 0u); // endpoint_map_ empty

for (size_t i = 0; i < 1000; ++i) {
store.testOnlyInsertWaiting(makeQuiescentEndpoint(*ctx_));
}
EXPECT_EQ(store.getSize(), 0u); // still empty
EXPECT_EQ(store.waitingListSize(), 1000u);

store.reclaimEndpoint();
EXPECT_EQ(store.waitingListSize(), 0u);
}

} // namespace
Loading