Skip to content

[Feature Request]: Mooncake PG Fault-Tolerance: Diagnosis & Improvement Plan #2157

@Dayuxiaoshui

Description

@Dayuxiaoshui

Describe your feature request

1. Experiment Setup

Environment: Docker container zhouyuhan-gpu on 8× NVIDIA A100-SXM4-80GB, PyTorch 2.4.0+cu121, CUDA 12.6, TCP transport (no RDMA in container).

Experiments conducted:

  1. kill -9 test: 4 ranks, rank 1 killed via SIGKILL while allreduce loop running.
  2. os._exit(0) test: Existing test_failed_rank from test_pg_elastic.py (clean exit).

2. Experiment Results

2.1 kill -9 Fault Injection (SIGKILL)

Ranks: [0, 1(broken), 2, 3], TCP transport
kill -9 PID=51107 (rank 1)

Observed behavior:

Time Event
t+0s kill -9 sent to rank 1
t+2s Rank 0: "marking peer 1 as broken during transferring op 1" (CORRECT)
t+2s Rank 2: "marking peer 0 as broken during transferring op 1" (FALSE POSITIVE)
t+2s Rank 3: "marking peer 0 as broken during transferring op 1" (FALSE POSITIVE)
t+60s Timeout: survivors did not continue collectives

Detection result: 0/3 survivors detected the failure via get_peer_state().
Continue result: 0/3 survivors continued allreduce after kill -9.

2.2 os._exit(0) Clean Exit

Ranks: [0, 1(broken), 2, 3], TCP transport
rank 1 calls os._exit(0)

Observed behavior:

E tcp_transport.cpp:720] getConnection failed to 127.0.0.1:16596. Error: Connection refused
E tcp_transport.cpp:898] startTransfer failed to get connection to 127.0.0.1:16596
Spawn timed out after 30.0 seconds (FAIL)

Result: Even with clean exit, survivors cannot continue collectives because the TransferEngine attempts to connect to the dead rank's TCP port, which returns "Connection refused".

2.3 Key Error Patterns

From mooncake_worker_thread.cpp:199:

LOG(ERROR) << "Rank " << group->rank << " marking peer " << j
           << " as broken during transferring op " << (int)task.opType;

The peer marking code at lines 192-209:

if (status.s == TransferStatusEnum::FAILED ||
    (j != group->rank &&
     diff.count() > kPingTimeoutMicroseconds_ &&
     group->engine->probePeerAliveByID(group->segmentIDs[j]) != PeerLiveness::Alive)) {
    // Set peerConnected to notify the connection poller to reconnect it.
    group->peerConnected[j] = false;
    group->activeRanks[j] = false;
    group->activeRanksTensor[j] = 0;
}

3. Root Cause Analysis

3.1 Problem: Connection Layer Ignores activeRanks

The activeRanks masking is checked in the gather phase of collectives (lines 184-188 of mooncake_worker_thread.cpp), but the connection establishment in TcpTransport::startTransfer tries to open connections to all ranks including inactive ones.

Flow for allreduce after rank failure:
  1. MooncakeWorker reads meta_->activeRanks[broken_rank] = false
  2. Skips the broken rank in the gather loop (j++)
  3. BUT: TcpTransport::startTransfer tries connect() to broken_rank's IP:port
  4. Connection refused → Transfer FAILED → triggers marking SURVIVORS as broken

3.2 Problem: Cascading False Positives

When rank 1 is killed:

  1. Rank 0's worker thread has an in-flight transfer to rank 1. That transfer fails → marks peerConnected[1] = false. Correct.
  2. Meanwhile, rank 0's listen socket may also be disrupted (shared TCP stack, EPIPE propagation).
  3. Rank 2's transfer to rank 0 also fails (because rank 0 is busy handling the rank 1 failure).
  4. Rank 2 marks activeRanks[0] = false. False positive cascading failure.

The root cause is that the fault detection uses the same channel (transfer operation result) for both:

  • Actual peer death (connection reset by peer)
  • Transient network issues (timeout, busy peer, EPIPE)

3.3 Problem: No Out-of-Band Health Check

The only health check mechanism is embedded in transfer operations (probePeerAliveByID). There is no independent heartbeat or health monitoring thread that runs on a separate control channel. This means:

  • Health status is only updated when a transfer operation is active
  • A dead rank can only be detected when someone tries to talk to it
  • The detection is conflated with the data plane

3.4 Problem: No Graceful Degradation Protocol

When a rank fails, there is no protocol for survivors to:

  1. Agree on the new active rank set (consensus/barrier)
  2. Reset connection state to exclude the dead rank
  3. Resume collectives with the reduced set

The existing recoverRanks() protocol assumes a replacement rank will join, rather than supporting pure degradation.

4. Proposed Improvements

4.1 Phase 1: Connection-Aware activeRanks Filtering (P0)

Problem: TransferEngine attempts connections to inactive ranks.

Fix: Add an activeRanks check in TcpTransport::startTransfer and the RDMA equivalent, so that connections are never attempted to inactive ranks.

// In TcpTransport::startTransfer (pseudocode):
if (meta_->activeRanks[targetRank] == false) {
    return;  // Skip connection to inactive rank
}

Expected outcome: Survivors can continue collectives after os._exit(0) since connections to dead ranks are never attempted.

4.2 Phase 2: Out-of-Band Health Check (P0)

Problem: Health check is embedded in data-plane transfers.

Fix: Add a dedicated health check thread with a separate control channel (gRPC or lightweight heartbeat).

Heartbeat thread (every 1s):
  for each peer in activeRanks:
    send heartbeat via control channel (not data channel)
    if N consecutive failures:
      mark peer as DEAD (separate from "connection broken")
      propose rank removal to other survivors

Key design decisions:

  • Separate control channel (not shared with data transfers) to avoid false positives
  • Configurable failure threshold (N consecutive failures before marking DEAD)
  • Consensus among survivors before finalizing rank removal

4.3 Phase 3: Graceful Degradation (P1)

Problem: No protocol for survivors to degrade without a replacement.

Fix: Implement a 3-phase degradation protocol:

Phase 1 - Detection:
  - Health check thread detects dead rank
  - Sets peerConnected[deadRank] = false, activeRanks[deadRank] = false
  - Local-only change, no consensus yet

Phase 2 - Consensus:
  - Survivors run a barrier (skip dead rank, use remaining ranks)
  - All agree on new activeRanks mask
  - Barrier also serves as a "are we all alive?" check

Phase 3 - Continuation:
  - All survivors reset connection state
  - Resume collectives with reduced activeRanks
  - Report reduced world_size to application layer

4.4 Phase 4: Elastic Backend (P2)

Problem: After degradation, how to scale back up?

Design: Extend the existing extension protocol (mooncake_backend.cpp:1208-1242):

1. Failure detected → degrade from N to N-1 ranks
2. External orchestrator notices → spawns replacement rank
3. Replacement rank joins as extension rank (existing protocol)
4. recoverRanks() restores the group to N ranks

The existing recoverRanks(), getPeerState(), and extendGroupSizeTo() provide most of the infrastructure. The missing piece is the detection→degradation path.

4.5 Phase 5: kill -9 Resilience (P3)

Hard kill (kill -9) is fundamentally harder because:

  • No TCP FIN → connection break detected only by timeout (~60s by default)
  • Shared memory segments may be corrupt
  • No cleanup handlers run

Approach:

  1. Lower TCP keepalive to 5s for faster detection of hard kills
  2. Use TCP_USER_TIMEOUT socket option to aggressively detect dead peers
  3. Add shared-memory watchdog (heartbeat flag written to pinned GPU memory)

5. Implementation Priority

Priority Item Effort Impact
P0 Connection-aware activeRanks filtering 2-3 days Enables survivors to continue after clean exit
P0 Out-of-band health check 3-4 days Eliminates false positives from kill -9
P1 Graceful degradation protocol 3-4 days Full FT for clean exits
P2 Elastic backend (degrade + replace) 4-5 days End-to-end self-healing
P3 kill -9 resilience 5-7 days Production hard-kill safety

6. Experiment Reproducibility

To reproduce the kill -9 experiment:

# Inside zhouyuhan-gpu container:
export LD_LIBRARY_PATH="/usr/local/lib:/usr/local/lib/python3.10/dist-packages/torch/lib:..."
export LD_PRELOAD="/usr/local/lib/libgflags.so:/usr/local/lib/libglog.so:/usr/local/lib/libjsoncpp.so"
python3 /tmp/test_fault_injection.py

The test script spawns 4 ranks, runs continuous allreduce, sends kill -9 to rank 1, and measures detection/recovery.

Key Observable Metrics

Metric Current Value Target
Detection time (kill -9) ~2s (false positive) <10s (accurate)
Detection time (os._exit) N/A (never recovers) <10s
False positive rate 66% (2/3 survivors) 0%
Survivor continue rate 0% 100%
Time to resume collectives ∞ (never) <30s

7. References

close:#1883

Before submitting a new issue...

  • Make sure you already searched for relevant issues and read the documentation

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions