[core] Fix task event loss during shutdown#60247
[core] Fix task event loss during shutdown#60247edoakes merged 14 commits intoray-project:masterfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request aims to fix task event loss during shutdown by ensuring that buffered events are flushed before the io_service is stopped. The changes in TaskEventBuffer correctly implement a synchronous wait for gRPC calls to complete. However, the same fix for RayEventRecorder is incomplete, as StopExportingEvents remains asynchronous, which could still lead to event loss. I've also found an issue in one of the new tests. My review includes suggestions to address these points.
0ffd54a to
438e741
Compare
9cc8050 to
b6b0dc8
Compare
396248b to
7bd049c
Compare
a8ae6c3 to
6d38b2d
Compare
6d38b2d to
9188910
Compare
b0aaecc to
00875fa
Compare
During shutdown, TaskEventBufferImpl::Stop() and RayEventRecorder were losing buffered events because the io_service was stopped immediately after calling async gRPC flush methods. This fix: - Adds a synchronous flush with configurable timeout in TaskEventBuffer::Stop() - Adds StopExportingEvents() method to RayEventRecorder for graceful shutdown - Calls StopExportingEvents() from GcsServer::Stop() before stopping io_service - Adds new config option task_events_shutdown_flush_timeout_ms (default 5s) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ruo <ruoliu.dev@gmail.com>
Changes: - Wait for in-flight gRPC to complete before final flush - Perform final flush to send all buffered events - Wait for flush gRPC to complete before shutdown - Add payload checks to avoid sending empty gRPC requests - Use lambda helper for cleaner wait logic - Fix race condition: signal under mutex to avoid lost wakeup - Fix overlapping gRPC: skip export if gRPC already in progress - Fix re-enable window: use stopping_ flag instead of re-enabling to prevent new events during shutdown flush - Fix concurrent gRPC tracking: use atomic counters instead of booleans to correctly track multiple in-flight calls when periodic and shutdown flushes race This ensures no events are lost during shutdown while respecting timeouts to avoid hanging indefinitely. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ruo <ruoliu.dev@gmail.com>
|
@sampan-s-nayak PTAL |
Add an enabled_ flag to RayEventRecorder that is set to false early during shutdown, preventing new events from being added after the final flush. This follows the same pattern as TaskEventBufferImpl::Stop() which sets enabled_ = false to block new events in AddTaskStatusEvent(). Without this fix, events added after ExportEvents() clears the buffer but before StopExportingEvents() returns would remain buffered and be lost when the io_service stops. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ruo <ruoliu.dev@gmail.com>
Initialize enabled_ to true instead of false so events can be buffered before StartExportingEvents() is called. The previous initialization caused events to be silently dropped, breaking test_ray_node_events. The enabled_ flag is only meant to block new events during shutdown, not before startup. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ruo <ruoliu.dev@gmail.com>
sampan-s-nayak
left a comment
There was a problem hiding this comment.
Thanks for working on this!
src/ray/common/ray_config_def.h
Outdated
| RAY_CONFIG(int64_t, task_events_dropped_task_attempt_batch_size, 10 * 1000) | ||
|
|
||
| /// Timeout in milliseconds to wait for task events to be flushed during shutdown. | ||
| /// During graceful shutdown, the TaskEventBuffer will wait up to this duration |
There was a problem hiding this comment.
| /// During graceful shutdown, the TaskEventBuffer will wait up to this duration | |
| /// During graceful shutdown, the TaskEventBuffer and RayEventRecorder will wait up to this duration |
|
|
||
| ASSERT_EQ(task_event_buffer_->GetNumTaskEventsStored(), num_events); | ||
|
|
||
| // Mock gRPC calls - follow existing test patterns (don't invoke callback) |
There was a problem hiding this comment.
do we need this comment?
| stopping_ = true; | ||
| FlushEvents(/*forced=*/true); | ||
| stopping_ = false; | ||
| wait_for_grpc_completion(); |
There was a problem hiding this comment.
with the current implementation we may end up exceeding flush_timeout_ms as both calls to wait_for_grpc_completion() have the same timeout (in the worst case we may end up spending 2*flush_timeout_ms)
| } | ||
|
|
||
| // Test that StopExportingEvents() flushes all buffered events. | ||
| // This verifies the fix for https://github.com/ray-project/ray/issues/60218 |
There was a problem hiding this comment.
do we need this comment.
|
|
||
| // First wait for any in-flight gRPC to complete, then flush, then wait again. | ||
| // This ensures no events are lost during shutdown. | ||
| if (wait_for_grpc_completion()) { |
There was a problem hiding this comment.
similar problem, we may exceed flush_timeout_ms
| // periodic export is still in flight. | ||
| if (grpc_in_progress_) { | ||
| RAY_LOG_EVERY_N_OR_DEBUG(WARNING, 100) | ||
| << "Skipping RayEventRecorder export: gRPC call already in progress."; |
There was a problem hiding this comment.
nit: we are not exactly skipping as we do eventually end up sending the event.
| << "Skipping RayEventRecorder export: gRPC call already in progress."; | |
| << "Previous RayEventRecorder export in progress: new events will be exported once previous export completes"; |
Bazel 9.0.0 requires explicit load statements for native rules like cc_binary and cc_library. Add the required load statement and MODULE.bazel for bzlmod support in the C++ example template. Changes: - Add load statement for rules_cc in _BUILD.bazel - Add _MODULE.bazel with rules_cc dependency - Update scripts.py to copy MODULE.bazel to generated template Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ruo <ruoliu.dev@gmail.com>
Changes based on PR ray-project#60247 review feedback: - Fix shutdown timeout exceeding configured value by using a shared deadline for both wait_for_grpc_completion calls instead of resetting the timeout for each call (sampan-s-nayak) - Update config comment to mention RayEventRecorder (sampan-s-nayak) - Improve log message wording: "Previous export in progress" instead of "Skipping export" (sampan-s-nayak) - Remove unnecessary comments in tests (sampan-s-nayak) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ruo <ruoliu.dev@gmail.com>
Signed-off-by: rlizzy <liuruo20021124@gmail.com>
Use .bazelversion to pin ray-template to Bazel 6.5.0 instead of adding MODULE.bazel for bzlmod support. This is simpler and ensures the template uses the same Bazel version as Ray itself. Changes: - Add _.bazelversion with version 6.5.0 - Remove _MODULE.bazel (no longer needed) - Fix typo: cpp_templete_dir -> cpp_template_dir (match upstream) - Update filename list in scripts.py Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ruo <ruoliu.dev@gmail.com>
…ndles it PR ray-project#60378 already copies .bazelversion from Ray's root directory to the generated ray-template, so we don't need a separate _.bazelversion file in the template. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ruo <ruoliu.dev@gmail.com>
The empty data check (num_task_events_to_send == 0 && num_dropped_task_attempts_to_send == 0) is now unreachable because FlushEvents already ensures there's actual data before calling SendRayEventsToAggregator via has_aggregator_payload check. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ruo <ruoliu.dev@gmail.com>
Refactor duplicated shutdown logic from TaskEventBufferImpl::Stop() and RayEventRecorder::StopExportingEvents() into a shared utility function GracefulShutdownWithFlush in ray/util/graceful_shutdown.h. Both classes used identical "wait, flush, wait" patterns with nearly identical lambda implementations. The new utility takes the mutex, condition variable, idle-check callable, flush callable, timeout, and component name as parameters. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ruo <ruoliu.dev@gmail.com>
| // This prevents the race where new events could be added during shutdown. | ||
| stopping_ = true; | ||
| FlushEvents(/*forced=*/true); | ||
| stopping_ = false; |
There was a problem hiding this comment.
Redundant stopping_ flag duplicates forced parameter purpose
Low Severity
The new stopping_ atomic flag is redundant with the existing forced parameter of FlushEvents. The stopping_ flag's sole purpose is to allow flushing when enabled_=false during shutdown. This check (!enabled_ && !stopping_) could be replaced with !enabled_ && !forced since forced=true is only ever passed during shutdown scenarios. This would eliminate the need for the extra member variable and the manual set/unset pattern around the FlushEvents call.
Additional Locations (2)
Address Cursor Bugbot feedback: - Fix silent timeout on second wait: now logs warning if flush wait times out instead of silently returning success - Change return type to void since callers don't use the return value (warnings are logged internally on timeout) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ruo <ruoliu.dev@gmail.com>
src/ray/util/graceful_shutdown.h
Outdated
| @@ -0,0 +1,79 @@ | |||
| // Copyright 2025 The Ray Authors. | |||
There was a problem hiding this comment.
we should update year to 2026
src/ray/util/graceful_shutdown.h
Outdated
| template <typename IsIdleFn, typename FlushFn> | ||
| void GracefulShutdownWithFlush(absl::Mutex &mutex, | ||
| absl::CondVar &cv, | ||
| IsIdleFn &&is_idle_fn, |
There was a problem hiding this comment.
nit: maybe we can move absl::CondVar &cv, IsIdleFn &&is_idle_fn,FlushFn &&flush_fn into a common interface (which callers can implement) and simplify GracefulShutdownWithFlush(). example:
class IWaitableShutdown {
public:
virtual bool WaitUntilIdle(absl::Duration timeout) = 0;
virtual void Flush() = 0;
};
// we can return whether shutdown was successful or not and handle logging on caller side
bool GracefulShutdownWithFlush(IWaitableShutdown& component,
absl::Duration timeout,
std::string_view component_name);Address PR review feedback from sampan-s-nayak: - Update copyright year to 2026 - Replace template-based approach with GracefulShutdownHandler interface for cleaner function signature and better abstraction The new interface requires callers to implement WaitUntilIdle() and Flush() methods, simplifying the utility function signature from 6 parameters to 3. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ruo <ruoliu.dev@gmail.com>
sampan-s-nayak
left a comment
There was a problem hiding this comment.
Thanks for working on this! LGTM
do you need to do this merge? it doesnt look like i have permission still @sampan-s-nayak
|
Fixes ray-project#60218 During shutdown, `TaskEventBufferImpl::Stop()` and `RayEventRecorder` were losing buffered events because the io_service was stopped immediately after calling async gRPC flush methods, without waiting for the gRPC calls to complete. This PR: - Adds a synchronous flush with configurable timeout in `TaskEventBuffer::Stop()` - waits up to 5 seconds (configurable via `task_events_shutdown_flush_timeout_ms`) for in-flight gRPC calls to complete - Adds `StopExportingEvents()` method to `RayEventRecorder` for graceful shutdown - Calls `StopExportingEvents()` from `GcsServer::Stop()` before stopping io_service - Adds new config option `task_events_shutdown_flush_timeout_ms` (default 5000ms) ## Test plan - [x] Added unit test `TestStopFlushesEvents` for `TaskEventBuffer` that verifies events are flushed during `Stop()` - [x] Added unit test `TestStopFlushesEvents` for `RayEventRecorder` that verifies events are exported during `StopExportingEvents()` - [ ] Ray CI tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: ruo <ruoliu.dev@gmail.com> Signed-off-by: rlizzy <liuruo20021124@gmail.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: rlizzy <liuruo20021124@gmail.com>
Fixes ray-project#60218 During shutdown, `TaskEventBufferImpl::Stop()` and `RayEventRecorder` were losing buffered events because the io_service was stopped immediately after calling async gRPC flush methods, without waiting for the gRPC calls to complete. This PR: - Adds a synchronous flush with configurable timeout in `TaskEventBuffer::Stop()` - waits up to 5 seconds (configurable via `task_events_shutdown_flush_timeout_ms`) for in-flight gRPC calls to complete - Adds `StopExportingEvents()` method to `RayEventRecorder` for graceful shutdown - Calls `StopExportingEvents()` from `GcsServer::Stop()` before stopping io_service - Adds new config option `task_events_shutdown_flush_timeout_ms` (default 5000ms) ## Test plan - [x] Added unit test `TestStopFlushesEvents` for `TaskEventBuffer` that verifies events are flushed during `Stop()` - [x] Added unit test `TestStopFlushesEvents` for `RayEventRecorder` that verifies events are exported during `StopExportingEvents()` - [ ] Ray CI tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: ruo <ruoliu.dev@gmail.com> Signed-off-by: rlizzy <liuruo20021124@gmail.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: rlizzy <liuruo20021124@gmail.com>
Fixes ray-project#60218 During shutdown, `TaskEventBufferImpl::Stop()` and `RayEventRecorder` were losing buffered events because the io_service was stopped immediately after calling async gRPC flush methods, without waiting for the gRPC calls to complete. This PR: - Adds a synchronous flush with configurable timeout in `TaskEventBuffer::Stop()` - waits up to 5 seconds (configurable via `task_events_shutdown_flush_timeout_ms`) for in-flight gRPC calls to complete - Adds `StopExportingEvents()` method to `RayEventRecorder` for graceful shutdown - Calls `StopExportingEvents()` from `GcsServer::Stop()` before stopping io_service - Adds new config option `task_events_shutdown_flush_timeout_ms` (default 5000ms) ## Test plan - [x] Added unit test `TestStopFlushesEvents` for `TaskEventBuffer` that verifies events are flushed during `Stop()` - [x] Added unit test `TestStopFlushesEvents` for `RayEventRecorder` that verifies events are exported during `StopExportingEvents()` - [ ] Ray CI tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: ruo <ruoliu.dev@gmail.com> Signed-off-by: rlizzy <liuruo20021124@gmail.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: rlizzy <liuruo20021124@gmail.com> Signed-off-by: 400Ping <jiekaichang@apache.org>
Fixes ray-project#60218 During shutdown, `TaskEventBufferImpl::Stop()` and `RayEventRecorder` were losing buffered events because the io_service was stopped immediately after calling async gRPC flush methods, without waiting for the gRPC calls to complete. This PR: - Adds a synchronous flush with configurable timeout in `TaskEventBuffer::Stop()` - waits up to 5 seconds (configurable via `task_events_shutdown_flush_timeout_ms`) for in-flight gRPC calls to complete - Adds `StopExportingEvents()` method to `RayEventRecorder` for graceful shutdown - Calls `StopExportingEvents()` from `GcsServer::Stop()` before stopping io_service - Adds new config option `task_events_shutdown_flush_timeout_ms` (default 5000ms) ## Test plan - [x] Added unit test `TestStopFlushesEvents` for `TaskEventBuffer` that verifies events are flushed during `Stop()` - [x] Added unit test `TestStopFlushesEvents` for `RayEventRecorder` that verifies events are exported during `StopExportingEvents()` - [ ] Ray CI tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: ruo <ruoliu.dev@gmail.com> Signed-off-by: rlizzy <liuruo20021124@gmail.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: rlizzy <liuruo20021124@gmail.com> Signed-off-by: Sirui Huang <ray.huang@anyscale.com>

Summary
Fixes #60218
During shutdown,
TaskEventBufferImpl::Stop()andRayEventRecorderwere losing buffered events because the io_service was stopped immediately after calling async gRPC flush methods, without waiting for the gRPC calls to complete.This PR:
TaskEventBuffer::Stop()- waits up to 5 seconds (configurable viatask_events_shutdown_flush_timeout_ms) for in-flight gRPC calls to completeStopExportingEvents()method toRayEventRecorderfor graceful shutdownStopExportingEvents()fromGcsServer::Stop()before stopping io_servicetask_events_shutdown_flush_timeout_ms(default 5000ms)Test plan
TestStopFlushesEventsforTaskEventBufferthat verifies events are flushed duringStop()TestStopFlushesEventsforRayEventRecorderthat verifies events are exported duringStopExportingEvents()🤖 Generated with Claude Code