[PG] Add rank fault tolerance and degraded recovery#2182
Conversation
Detect failed active peers with out-of-band liveness probes, deactivate them for survivor collectives, and strengthen elastic recovery tests for CPU and CUDA/RDMA paths.
There was a problem hiding this comment.
Code Review
This pull request introduces a lazy initialization mechanism for the Mooncake backend, an out-of-band liveness probe for the connection poller, and a runtime interface to update active ranks. The changes include refactoring the MooncakeBackend to support deferred resource allocation, adding health check logic to ConnectionContext, and implementing retry logic for failed transfers in the MooncakeWorker. Feedback highlights several concurrency and robustness issues: the use of non-atomic booleans in a double-checked locking pattern for initialization, a data race on the static backendIndex_, and thread-safety concerns regarding direct at::Tensor modifications from the worker thread. Additionally, the reviewer noted that the 100-microsecond ping timeout is likely too aggressive and that the new retry logic for sync operations lacks a maximum retry limit or backoff mechanism.
| if (resourcesInitialized_) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
The double-checked locking pattern used here for resourcesInitialized_ is not thread-safe because resourcesInitialized_ is a plain bool. In C++, this can lead to undefined behavior due to instruction reordering or memory visibility issues. Please change resourcesInitialized_, resourcesStarted_, initializationFailed_, and isShutdown_ to std::atomic<bool> to ensure correct synchronization.
|
|
||
| globalRanksInGroup_.assign(distBackendOpts.global_ranks_in_group.begin(), | ||
| distBackendOpts.global_ranks_in_group.end()); | ||
| instanceBackendIndex_ = backendIndex_++; |
There was a problem hiding this comment.
|
|
||
| group->peerConnected[peerRank] = false; | ||
| group->activeRanks[peerRank] = false; | ||
| group->activeRanksTensor[peerRank] = 0; |
There was a problem hiding this comment.
Modifying activeRanksTensor directly from the worker thread is not thread-safe. at::Tensor operations are not guaranteed to be atomic or thread-safe against concurrent reads or writes from other threads (like the main thread or the connection poller). Additionally, if the tensor is on CUDA, this operation might have side effects on the current CUDA context or stream of the worker thread. It is safer to update the bool* activeRanks array and signal the main thread or poller to update the tensor consistently.
| PeerLiveness::Alive)) { | ||
| LOG(ERROR) | ||
| if ((status.s == TransferStatusEnum::FAILED || | ||
| diff.count() > kPingTimeoutMicroseconds_) && |
There was a problem hiding this comment.
The timeout kPingTimeoutMicroseconds_ is set to 100 microseconds, which is extremely aggressive for network transfers. This will likely cause frequent and unnecessary calls to probePeerAliveByID in the high-performance worker loop, potentially impacting throughput. Consider increasing this threshold or implementing a backoff mechanism for liveness probes in the worker thread.
| } else if (status.s == TransferStatusEnum::FAILED) { | ||
| LOG(WARNING) | ||
| << "Rank " << group->rank | ||
| << " retrying sync to alive peer " << j | ||
| << " during op " << (int)task.opType; | ||
| freeBatchID(group, task.batchID); | ||
|
|
||
| for (size_t peer = 0; peer < kMaxNumRanks; | ||
| ++peer) { | ||
| rankToTaskId[i][peer] = kInvalidTaskId; | ||
| } | ||
| auto retry_source_ptr = | ||
| (int32_t*)group->segmentInfos[group->rank] | ||
| .send_sync[task.bufferOffset]; | ||
| std::vector<TransferRequest> entries; | ||
| for (int peer = 0; peer < group->size; ++peer) { | ||
| if (!group->activeRanks[peer]) { | ||
| continue; | ||
| } | ||
| *retry_source_ptr = 1; | ||
| rankToTaskId[i][peer] = entries.size(); | ||
| entries.push_back(TransferRequest{ | ||
| .opcode = TransferRequest::WRITE, | ||
| .source = (void*)retry_source_ptr, | ||
| .target_id = group->segmentIDs[peer], | ||
| .target_offset = | ||
| group->segmentInfos[peer] | ||
| .recv_sync[task.bufferOffset] + | ||
| group->rank * sizeof(int32_t), | ||
| .length = sizeof(int32_t), | ||
| }); | ||
| } | ||
| task.batchID = group->engine->allocateBatchID( | ||
| entries.size()); | ||
| group->engine->submitTransfer(task.batchID, | ||
| entries); | ||
| activeTime[i] = clock::now(); | ||
| task_status[i].store(SIGNALED_1, | ||
| std::memory_order_release); | ||
| task_done = false; | ||
| break; | ||
| } else { |
There was a problem hiding this comment.
The retry logic for failed sync operations lacks a retry limit or exponential backoff. If a peer is reported as 'Alive' by the engine but consistently fails transfers (e.g., due to a persistent configuration mismatch), the worker thread will enter a tight loop of re-submissions, consuming significant CPU and potentially masking the underlying issue. Consider adding a maximum retry count before marking the peer as broken.
|
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
caozhanhao
left a comment
There was a problem hiding this comment.
Thanks for the effort on this! The overall goal makes sense, but I have several concerns about the current approach. I've left some comments inline, and I think a few parts will need to be adjusted to make sure we do not introduce side effects.
633d54f to
68e9a5d
Compare
|
cc @caozhanhao |
|
cc @caozhanhao @yuechen-sys The CI failure in the Rust test step is not caused by our code changes. All 8 Rust unit tests pass successfully; the failure is a known LeakSanitizer (LSan) compatibility issue in the GitHub Actions container environment (Tracer caught signal 11: LeakSanitizer has encountered a fatal error). LSan does not work correctly under ptrace, which is a CI infrastructure limitation rather than a code defect. The minimal_smoke tests themselves are green. |
caozhanhao
left a comment
There was a problem hiding this comment.
Thanks for the updates! PG did have some bugs with TcpTransport previously, but it seems these changes also broke the RDMA path. In my environment, TestMooncakePGElasticCUDA.test_failed_rank is hanging.
By the way, since this PR's base lies between #2066 and #2192, setting environment variable WITH_NVIDIA_PEERMEM=1 may be necessary on some machines to rule out external factors.
caozhanhao
left a comment
There was a problem hiding this comment.
Thanks for the follow-up! The implementation looks much cleaner now, with only a few minor issues left to address.
…onnected, simplify worker thread
|
LGTM. Thanks for your contribution! |
|
cc @yuechen-sys |
Description
Fixes #2157
This PR adds rank-level fault tolerance for Mooncake PG. Previously, failed peers were mostly detected through data-plane collective failures. That made clean exits, hard kills, and degraded survivor continuation unreliable because healthy ranks could keep waiting for a peer that was no longer reachable.
The new flow moves failure detection into the PG connection layer:
ConnectionPollerperiodically probes connected active peers out-of-band.peerConnected, deactivating theiractiveRanksentry, deleting stale store metadata, resetting P2P state, and returning the peer state machine to wait for replacement metadata.activeRanksto skip inactive peers, so survivors can keep running degraded collectives without submitting transfers or sync operations to failed ranks.recover_ranks()/join_group()path and become active again through the synchronized active-rank mask.The PR also strengthens PG elasticity and recovery coverage, including manual active-rank masking, clean-exit fault detection, gated SIGKILL fault detection, degraded survivor collectives, and replacement recovery on both CPU and CUDA/RDMA paths.
Module
mooncake-transfer-engine)mooncake-store)mooncake-ep)mooncake-integration)mooncake-p2p-store)mooncake-wheel)mooncake-pg)mooncake-rl)Type of Change
How Has This Been Tested?
Tested in the
zhouyuhan-gpu-rdmacontainer with CUDA/RDMA enabled.CUDA/RDMA environment:
Focused CUDA/RDMA smoke test:
Result: passed.
CUDA init functional regression:
Result:
CUDA elastic and fault-tolerance regression:
Result:
The skipped test is the gated SIGKILL fault-injection test, which was run separately.
CUDA/RDMA gated SIGKILL fault detection:
Result: passed.
CPU gated SIGKILL fault detection:
Result: passed.
Combined CPU + CUDA PG regression:
Result:
The two skipped tests are the default-disabled CPU/CUDA SIGKILL tests. Both were explicitly run with
MOONCAKE_PG_ENABLE_KILL9_TESTS=1and passed.Checklist
./scripts/code_format.shbefore subm