[transfer_engine] fix: drain endpoint waiting list via periodic reclaim#1952
Conversation
reclaimEndpoint() is currently invoked only from RdmaContext::endpoint() after a new insertion. Under healthy load, insertions and evictions are 1:1 so this works. Under failure load -- many error completions trigger deleteEndpoint(), but new-insertion traffic stalls because the dead peer isn't generating new connection paths -- waiting_list_ grows without bound and QPs never get destroyed. Add a 1Hz reclaimEndpoints() call from monitorWorker on the existing 1-second context heartbeat. This decouples reclaim cadence from insertion traffic. See issue kvcache-ai#1845.
…i#1845 Adds unit + integration coverage for the periodic reclaim fix. endpoint_store_test (5 tests, no RDMA device, runs under ctest): - reclaim drains quiescent entries on its own - reclaim leaves active entries alone (gate preserved) - reclaim is idempotent when empty - leak manifests without reclaim call (1118-eviction mirror of reporter) - reclaim works without active map (guard against insert/reclaim coupling) endpoint_store_integration_test (requires RDMA device, not auto-registered): - Verifies WorkerPool::monitorWorker actually calls reclaimEndpoints at ~1 Hz by constructing a real RdmaContext and waiting for the tick to drain injected entries. Confirms the end-to-end fix wiring. Supporting changes: - EndpointStore::waitingListSize() accessor (diagnostics + tests) - SIEVEEndpointStore::testOnlyInsertWaiting() for test injection - RdmaContext::endpointStore() accessor (diagnostics + tests)
- design/transfer-engine: add a sentence to Endpoint Management explaining that waiting_list_ drains both on insertion and on the monitorWorker heartbeat, so accumulated reclaim does not stall under failure load. - troubleshooting: extend the "Failed to create QP: Cannot allocate memory" entry with a bullet pointing at issue kvcache-ai#1845 so operators seeing the symptom find the cause and the fix.
There was a problem hiding this comment.
Code Review
This pull request addresses a resource leak (issue #1845) where RDMA endpoints and Queue Pairs (QPs) accumulated during peer failures because reclamation was only triggered by new insertions. The fix introduces a periodic 1 Hz reclamation tick within the monitorWorker thread to ensure the waiting_list_ is drained even when insertions stall. The changes include updates to documentation, the addition of a waitingListSize metric for observability, and new unit and integration tests. Review feedback highlighted a thread-safety concern in FIFOEndpointStore::waitingListSize(), where accessing the size of the waiting_list_ set without synchronization could lead to undefined behavior.
| int disconnectQPs() override; | ||
|
|
||
| size_t getTotalQPNumber() override; | ||
| size_t waitingListSize() const override { return waiting_list_.size(); } |
There was a problem hiding this comment.
The implementation of waitingListSize() in FIFOEndpointStore is not thread-safe. Accessing waiting_list_.size() on a std::unordered_set without a lock while other threads might be concurrently modifying the set (e.g., in deleteEndpoint, evictEndpoint, or reclaimEndpoint) leads to undefined behavior.
To resolve this, consider adding an atomic counter waiting_list_len_ to FIFOEndpointStore, similar to the implementation in SIEVEEndpointStore. This would allow for a lock-free and thread-safe size check. Alternatively, you could use a ReadGuard with the endpoint_map_lock_, though this would require making the lock mutable to be used within this const method.
…atomic counter Per PR kvcache-ai#1952 review: FIFO variant returned waiting_list_.size() on std::unordered_set without holding endpoint_map_lock_, racing concurrent modification. Mirror the SIEVE pattern with an atomic waiting_list_len_ incremented in delete/evict, decremented in reclaim.
… LSAN CI build (3.10/3.12) runs with -DENABLE_ASAN=ON and LSAN flagged the 5 × 288 byte allocation the test fixture intentionally leaks (~RdmaTransport dereferences a null metadata_ unless install() ran). Gate on __SANITIZE_ADDRESS__ / __has_feature and mark the pointer with __lsan_ignore_object so real leaks are still caught.
d3ec123 to
c314fca
Compare
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
| size_t waitingListSize() const override { | ||
| return waiting_list_len_.load(std::memory_order_relaxed); | ||
| } |
There was a problem hiding this comment.
waiting_list_len_ is declared as atomic but waitingListSize() returns size_t.
There was a problem hiding this comment.
Good catch, I promoted the counter to atomic<size_t in both FIFO and SIEVE. This is now a clean pass through getter.
| // insertions are happening. Without this, reclaim only runs | ||
| // from RdmaContext::endpoint() and the waiting list grows | ||
| // unboundedly under failure load. See issue #1845. | ||
| context_.reclaimEndpoints(); |
There was a problem hiding this comment.
Previously reclaimEndpoint() was only called from within insertEndpoint(), which held endpoint_map_lock_. Now it would be called without the lock, please confirm reclaimEndpoint()'s lock contract hasn't changed in behavior.
There was a problem hiding this comment.
reclaimEndpoint() lock contract has not changed behavior:
reclaimEndpoint()still self-locks usingWriteGuard(endpoint_map_lock_).- The existing caller
rdma_context.cpp:355-356already had theinsertEndpointlock freed by its destructor. - The new caller in
monitorWorker()observes the same contract as it holds noEndpointStorelocks. insertEndpoint()definitions in FIFO, SIEVE, and UB self-lock and only callevictEndpoint(), which is caller-locked. None callreclaimEndpoint().
I added a comment to make this contract clear to future contributors.
|
|
||
| // Raw accessor for the endpoint cache. Null before construct() runs. | ||
| // Used by integration tests that need to observe waiting_list_ directly. | ||
| EndpointStore *endpointStore() const { return endpoint_store_.get(); } |
There was a problem hiding this comment.
This func would expose a raw point to EndpointStore*. If this were to be called for other purposes in the future (not just for testing), would it be safe?
There was a problem hiding this comment.
Yes, I had some concerns about this as well. I decided to remove this pointer and only add the necessary methods to RdmaContext: waitingListSize() and testOnlyInsertWaiting(std::shared_ptr<RdmaEndPoint> ep).
waitingListSize()returns by value, so there is no pointer to stash.testOnlyInsertWaiting()takes ashared_ptrby value. This removes the raw pointer access across the API boundary.testOnlyInsertWaiting()is lifted as a virtual function on theEndpointStorebase interface. This has the added benefit of making the integration test no longer needdynamic_casttoSIEVEEndpointStore.
waitingListSize() returns size_t but the underlying counter was atomic<int>, which quietly narrowed on load. Promote to atomic<size_t> in both FIFO and SIEVE so the getter is a clean pass-through with no implicit conversion.
…erface monitorWorker now calls reclaimEndpoint() via RdmaContext; it already acquired endpoint_map_lock_ internally, but nothing declared that. Codify the precondition on the base so future callers know not to hold the lock. RWSpinlock is non-reentrant, so recursive acquisition would deadlock.
…rface Previously exposed a raw EndpointStore* via RdmaContext::endpointStore() for the integration test. A raw pointer is easy to misuse outside of tests and couples the caller to the concrete store via dynamic_cast. Replace with two narrow methods on RdmaContext: waitingListSize() (value return) and testOnlyInsertWaiting(shared_ptr<RdmaEndPoint>). The latter is lifted onto the EndpointStore base interface and implemented on both FIFO and SIEVE, so the integration test no longer downcasts.
… ctest Integration test was previously unregistered and invoked manually. Now self-skips via GTEST_SKIP when no RDMA device is present, so it runs cleanly on CI runners without RDMA (skips) and on rxe/mlx5 hosts (executes). Labeled "rdma" for ctest -L filtering.
…is empty monitorWorker now drives reclaim at ~1 Hz regardless of activity. On FIFO this grabbed endpoint_map_lock_ as WriteGuard every tick even in the common steady-state case where waiting_list_ is empty. Add the same counter-check short-circuit SIEVE already has.
…ruct fails GHA ubuntu-22.04 runners enumerate a phantom mlx5_0 via ibv_get_device_list without a working port/GID, so pickRdmaDevice() returns a non-empty name and the earlier GTEST_SKIP on empty device list doesn't fire. Then construct() fails with ERR_CONTEXT and the hard ASSERT_EQ fails the test. Convert the assertion to a GTEST_SKIP on construct failure. Matches the "attempt setup, skip on failure" convention used elsewhere in the repo (e.g., client_local_hot_cache_test.cpp:794-799).
Description
This closes #1845. Under SGLang PD-disaggregated prefill,
ibv_create_qpeventually fails withCannot allocate memoryandrdma resource showreports >20K QPs per NIC, preceded by 1118 endpoint-evicted events.SIEVEEndpointStore::reclaimEndpointdrainswaiting_list_of quiescent (a.k.a. evicted and deleted) endpoints so theirshared_ptrsdrop, and destructors runibv_destroy_qp. The only production caller wasRdmaContext::endpoint(). The reclaim piggybacked on new endpoint insertions. Under healthy load, insertions and evictions are coupled, so the list drains in step. Under failure load, such as peers dying or when retries are exhausted, evictions and deletions continue while insertions stop. This causes reclaims to stop firing andwaiting_list_grows unboundedly. Each accumulated endpoint pinsnum_qp_per_epQPs against the NIC's pool (1118 x 2..4 ~= enough to exhaust). The reporter's 4x QP asymmetry across NIC groups (22k onmlx5_00-03vs. 6k onmlx5_04-07) is consistent with this. Eviction load concentrated on the NICs routing to the failing peer.To fix this, I added a
context_.reclaimEndpoints()call insideWorkerPool::monitorWorker. This is included on the existinglast_reset_tsheartbeat. The result is that reclaim no longer depends on insertion traffic. The change is minimal. There is one line of behavior change inworker_pool.cppand a thin wrapper methodRdmaContext::reclaimEndpoints()to delegate to the store.I also tried adding eager
disconnect()todeleteEndpoint/evictEndpointinitially, but it crashes onrxewithin ~5s withmalloc(): unaligned tcache chunk detected. I think theslice->rdma.qp_depthraw pointer intowr_depth_list_becomes dangling when reclaim destroys the endpoint while an in-flight slice still holds it. Fixing that properly requires a shared-ownership refactor ofwr_depth_list_, which I could do as a follow-up so eager disconnect can be re-added safely.Module
mooncake-transfer-engine)mooncake-store)mooncake-ep)mooncake-integration)mooncake-p2p-store)mooncake-wheel)mooncake-pg)mooncake-rl)Type of Change
How Has This Been Tested?
There are 5 new unit tests:
ReclaimDrainsQuiescentEntriesis the base method contract.ReclaimLeavesActiveEntriestests that thehasOutstandingSlicegate is preserved.ReclaimIsIdempotentWhenEmptytests cadence safety.LeakManifestsWithoutReclaimCallreproduces the reporter's 1118-eviction shape and asserts a single reclaim call drains the full backlog.ReclaimDoesNotRequireActiveMapguards against a regression that conflates reclaim with insert.There is 1 new integration test:
endpoint_store_integration_testconstructs a realRdmaContext, spawnsmonitorWorker, injects quiescent entries intowaiting_list_, sleeps, and asserts drainage. I used this bisection to validate the fix. If you remove the newreclaimEndpoints()call then this test will fail with the right error message, if you add it back it will pass again.I ran the full ctest suite. There is one existing failure:
hot_standby_service_test. I think this is simply due toetcdnot being available on my system. It's also failing on main, and the code doesn't seem related at all, so I'm flagging this as environmental. Please let me know if you'd like me to take a closer look.I don't have the hardware to match the original SGLang mlx5 cluster conditions, unfortunately. If you are aware of on-demand spot instances for mlx5, please let me know and I can run this patch there.
This is the test script I used to manually reproduce:
Checklist
./scripts/code_format.shbefore submitting.