Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,11 @@ RAY_CONFIG(uint64_t, task_events_max_num_profile_events_buffer_on_worker, 10 * 1
/// report to GCS.
RAY_CONFIG(int64_t, task_events_dropped_task_attempt_batch_size, 10 * 1000)

/// Timeout in milliseconds to wait for task events to be flushed during shutdown.
/// During graceful shutdown, the TaskEventBuffer will wait up to this duration
/// for in-flight gRPC calls to complete before stopping the io_service.
RAY_CONFIG(int64_t, task_events_shutdown_flush_timeout_ms, 5000)

/// The delay in ms that GCS should mark any running tasks from a job as failed.
/// Setting this value too smaller might result in some finished tasks marked as failed by
/// GCS.
Expand Down
66 changes: 61 additions & 5 deletions src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,39 @@ void TaskEventBufferImpl::Stop() {
if (!enabled_) {
return;
}
// Set enabled_ to false early to prevent double-stop and disable new events
enabled_ = false;
RAY_LOG(INFO) << "Shutting down TaskEventBuffer.";

auto flush_timeout_ms = RayConfig::instance().task_events_shutdown_flush_timeout_ms();

// Helper to wait for in-flight gRPC calls to complete with timeout.
// Returns true if completed, false if timed out.
auto wait_for_grpc_completion = [&]() {
absl::MutexLock lock(&grpc_completion_mutex_);
auto deadline = absl::Now() + absl::Milliseconds(flush_timeout_ms);
while (gcs_grpc_in_progress_ || event_aggregator_grpc_in_progress_) {
if (grpc_completion_cv_.WaitWithDeadline(&grpc_completion_mutex_, deadline)) {
return false; // Timeout
}
}
return true; // Completed
};

// First wait for any in-flight gRPC to complete, then flush, then wait again.
// This ensures no events are lost during shutdown.
if (wait_for_grpc_completion()) {
// Use stopping_ flag to allow flush without re-enabling event ingestion.
// This prevents the race where new events could be added during shutdown.
stopping_ = true;
FlushEvents(/*forced=*/true);
stopping_ = false;
wait_for_grpc_completion();
} else {
RAY_LOG(WARNING) << "TaskEventBuffer shutdown timed out waiting for gRPC. "
<< "Some events may be lost.";
}

// Shutting down the io service to exit the io_thread. This should prevent
// any other callbacks to be run on the io thread.
io_service_.stop();
Expand Down Expand Up @@ -816,7 +847,12 @@ void TaskEventBufferImpl::SendTaskEventsToGCS(std::unique_ptr<rpc::TaskEventData
num_dropped_task_attempts_to_send);
this->stats_counter_.Increment(kTotalTaskEventsBytesReported, num_bytes_to_send);
}
gcs_grpc_in_progress_ = false;
// Signal under mutex to avoid lost wakeup race condition
{
absl::MutexLock lock(&grpc_completion_mutex_);
gcs_grpc_in_progress_ = false;
grpc_completion_cv_.Signal();
}
};
task_accessor->AsyncAddTaskEventData(std::move(data), on_complete);
}
Expand Down Expand Up @@ -850,11 +886,19 @@ void TaskEventBufferImpl::SendRayEventsToAggregator(
TaskEventBufferCounter::kTotalNumLostTaskAttemptsReportedToAggregator,
num_dropped_task_attempts_to_send);
}
event_aggregator_grpc_in_progress_ = false;
// Signal under mutex to avoid lost wakeup race condition
{
absl::MutexLock lock(&grpc_completion_mutex_);
event_aggregator_grpc_in_progress_ = false;
grpc_completion_cv_.Signal();
}
};

if (num_task_events_to_send == 0 && num_dropped_task_attempts_to_send == 0) {
// Signal under mutex to avoid lost wakeup race condition
absl::MutexLock lock(&grpc_completion_mutex_);
event_aggregator_grpc_in_progress_ = false;
grpc_completion_cv_.Signal();
} else {
rpc::events::AddEventsRequest request;
*request.mutable_events_data() = std::move(*data);
Expand All @@ -863,7 +907,9 @@ void TaskEventBufferImpl::SendRayEventsToAggregator(
}

void TaskEventBufferImpl::FlushEvents(bool forced) {
if (!enabled_) {
// Allow flush during shutdown (stopping_) even if enabled_ is false.
// This ensures the final flush happens without re-enabling event ingestion.
if (!enabled_ && !stopping_) {
return;
}

Expand Down Expand Up @@ -909,10 +955,20 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
if (export_event_write_enabled_) {
WriteExportData(status_events_to_write_for_export, profile_events_to_send);
}
if (send_task_events_to_gcs_enabled_) {
// Only send to GCS if there's actual data or metadata to send.
const bool has_gcs_payload =
data.task_event_data && (data.task_event_data->events_by_task_size() > 0 ||
data.task_event_data->dropped_task_attempts_size() > 0 ||
data.task_event_data->num_profile_events_dropped() > 0);
if (send_task_events_to_gcs_enabled_ && has_gcs_payload) {
SendTaskEventsToGCS(std::move(data.task_event_data));
}
if (send_ray_events_to_aggregator_enabled_) {
// Only send to event aggregator if there's actual data or metadata to send.
const bool has_aggregator_payload =
data.ray_events_data &&
(data.ray_events_data->events_size() > 0 ||
data.ray_events_data->task_events_metadata().dropped_task_attempts_size() > 0);
if (send_ray_events_to_aggregator_enabled_ && has_aggregator_payload) {
SendRayEventsToAggregator(std::move(data.ray_events_data));
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/ray/core_worker/task_event_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,15 @@ class TaskEventBufferImpl : public TaskEventBuffer {
/// True if there's a pending gRPC call to the event aggregator.
std::atomic<bool> event_aggregator_grpc_in_progress_ = false;

/// Mutex and condition variable for waiting on gRPC completion during shutdown.
absl::Mutex grpc_completion_mutex_;
absl::CondVar grpc_completion_cv_;

/// True during shutdown to allow final flush without re-enabling event ingestion.
/// This prevents the race where new events could be added during the brief window
/// when enabled_ would otherwise be set back to true for flushing.
std::atomic<bool> stopping_ = false;

/// If true, task events are exported for Export API
bool export_event_write_enabled_ = false;

Expand Down Expand Up @@ -632,6 +641,11 @@ class TaskEventBufferImpl : public TaskEventBuffer {
FRIEND_TEST(TaskEventBufferTest, TestCreateRayEventsDataWithProfileEvents);
FRIEND_TEST(TaskEventBufferTestDifferentDestination,
TestMixedStatusAndProfileEventsToRayEvents);
FRIEND_TEST(TaskEventBufferTestDifferentDestination, TestStopFlushesEvents);
FRIEND_TEST(TaskEventBufferTestDifferentDestination,
TestStopWaitsForInflightThenFlushes);
FRIEND_TEST(TaskEventBufferTestDroppedAttemptsOnly,
TestFlushSendsDroppedAttemptsWithoutEvents);
};

} // namespace worker
Expand Down
Loading