fix(stats): close race in RuntimeStatsManager shutdown#6891
Conversation
… lost stats for fast-finishing pipelines When a pipeline finishes early (e.g. a hit LIMIT), `run_execution_loop` fires `finish_tx`, which the manager's `select!` can pick over pending `RegisterRuntimeStats` / `TakeInputSnapshot` messages on `node_rx`. The old shutdown path broke out of the loop without draining `node_rx` and only built `finished_snapshots` after the break, leaving a window in which `take_input_snapshot` saw `None` snapshots, sent on a channel the manager no longer read, and got `Err` back — surfacing as zero-stats events for completed tasks. Fix: - drain `node_rx` (try_recv) inside the `finish_rx` arm so late-arriving register/snapshot messages aren't dropped; - publish `finished_snapshots` before `break` so it's visible the moment the loop exits; - in `take_input_snapshot`, fall back to `finished_snapshots` after a channel send or response failure, since both can race with the manager's final shutdown step. Reproed locally as the flaky `test_limit_without_estimated_rows` failure on rust-tests-platform; with this change it passes 100/100. https://claude.ai/code/session_01NVUu3zCZwk3rdaiXpPnYKD
Rust Dependency DiffHead: ✅ OK: Within budget.
|
Greptile SummaryThis PR closes a shutdown race in
Confidence Score: 5/5Safe to merge — the change closes a documented race without altering the normal hot path, and all three shutdown windows are addressed consistently. The fix is logically tight: draining the channel before finalizing nodes, publishing finished_snapshots before break, and falling back gracefully in take_input_snapshot all compose correctly. The invariant that finished_snapshots is set before node_rx is dropped holds in all paths. No files require special attention. Important Files Changed
Sequence DiagramsequenceDiagram
participant Caller as take_input_snapshot
participant Chan as mpsc channel (node_rx)
participant Mgr as Manager event loop
participant FS as finished_snapshots (Mutex)
note over Mgr: finish_rx fires (pipeline done)
Mgr->>Chan: try_recv() drain loop
Chan-->>Mgr: RegisterRuntimeStats / TakeInputSnapshot (if queued)
Mgr->>Mgr: flush_and_finalize_node for active nodes
Mgr->>FS: write finished snapshots (before break)
Mgr->>Mgr: break, task exits, node_rx dropped
Caller->>FS: take_finished_snapshot (fast path)
alt already published
FS-->>Caller: Some(stats)
else not yet published
FS-->>Caller: None
Caller->>Chan: send TakeInputSnapshot
alt send fails (node_rx dropped)
Caller->>FS: take_finished_snapshot (fallback)
FS-->>Caller: Some(stats)
else send succeeds, await rx
Mgr-->>Caller: Ok(stats) via oneshot
else rx.await Err (responder dropped)
Caller->>FS: take_finished_snapshot (fallback)
FS-->>Caller: Some(stats)
end
end
Reviews (3): Last reviewed commit: "refactor(stats): extract handle_message ..." | Re-trigger Greptile |
The closure mutates passed-in `&mut` references, not captured state, so `mut` on the binding itself is unused. Caught by clippy in the style CI. https://claude.ai/code/session_01NVUu3zCZwk3rdaiXpPnYKD
|
@greptileai please take another look |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6891 +/- ##
==========================================
+ Coverage 75.37% 75.39% +0.02%
==========================================
Files 1135 1135
Lines 160346 160375 +29
==========================================
+ Hits 120857 120913 +56
+ Misses 39489 39462 -27
🚀 New features to boost your workflow:
|
The closure had grown to ~50 lines and only captured shared references that can be passed explicitly, mirroring flush_and_finalize_node's signature. Reads more easily as a method. https://claude.ai/code/session_01NVUu3zCZwk3rdaiXpPnYKD
|
@greptileai one more time please |
Summary
take_input_snapshotcould return empty stats for fast-finishing pipelines (e.g. a hitLIMIT), surfacing as the flakytest_limit_without_estimated_rowsfailure onrust-tests-platform.Race
When a pipeline finishes early,
run_execution_loopfiresfinish_tx. The manager's biasedselect!could pickfinish_rxover still-queuedRegisterRuntimeStats/TakeInputSnapshotmessages onnode_rx, then break the loop and only buildfinished_snapshotsafter the break. A concurrenttake_input_snapshotlanding in that window sawfinished_snapshots == None, sent on a channel the manager no longer read, and gotErrback — empty stats.Fix (
src/daft-local-execution/src/runtime_stats/mod.rs)node_rx(try_recv) inside thefinish_rxarm so late register messages aren't dropped.finished_snapshotsbeforebreak, so it's visible the moment the loop exits.take_input_snapshot, fall back tofinished_snapshotsafter a channel send/recv failure.Locally repro'd at ~10% on the test pre-fix; 0/100 with the fix.
https://claude.ai/code/session_01NVUu3zCZwk3rdaiXpPnYKD