Skip to content

Commit bbbe9ef

Browse files
authored
[Core] Add Logic to Emit Task Events to Event Aggregator (ray-project#53402)
This PR adds the logic to emit task events to event aggregator, mainly: * Update the task event buffer logic to convert and send the ray event to the event aggregator at the same time when the task events are converted and sent to GCS * Added 2 configs to control turning on/off the path to send to GCS and the path to send to event aggregator * Added test task event buffer tests for the ray event to aggregator path --------- Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com> Signed-off-by: myan <myan@anyscale.com>
1 parent 9b03427 commit bbbe9ef

24 files changed

+1075
-294
lines changed

BUILD.bazel

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,9 @@ ray_cc_library(
196196
name = "event_aggregator_client",
197197
hdrs = ["src/ray/rpc/event_aggregator_client.h"],
198198
deps = [
199+
":event_aggregator_cc_rpc",
199200
":grpc_client",
201+
"//src/ray/common:status",
200202
"//src/ray/protobuf:events_event_aggregator_service_cc_proto",
201203
"//src/ray/util:logging",
202204
"@com_github_grpc_grpc//:grpc++",
@@ -389,6 +391,13 @@ ray_cc_library(
389391
],
390392
)
391393

394+
cc_grpc_library(
395+
name = "event_aggregator_cc_rpc",
396+
srcs = ["//src/ray/protobuf:events_event_aggregator_service_proto"],
397+
grpc_only = True,
398+
deps = ["//src/ray/protobuf:events_event_aggregator_service_cc_proto"],
399+
)
400+
392401
# pubsub.
393402
cc_grpc_library(
394403
name = "pubsub_cc_grpc",

python/ray/dashboard/modules/aggregator/aggregator_agent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,10 @@ def _receive_events(self, request):
213213
status_message += (
214214
f", and {len(error_messages) - truncate_num} more events dropped"
215215
)
216-
status = events_event_aggregator_service_pb2.AddEventStatus(
216+
status = events_event_aggregator_service_pb2.AddEventsStatus(
217217
code=status_code, message=status_message
218218
)
219-
return events_event_aggregator_service_pb2.AddEventReply(status=status)
219+
return events_event_aggregator_service_pb2.AddEventsReply(status=status)
220220

221221
def _send_events_to_external_service(self, event_batch):
222222
"""

python/ray/dashboard/modules/aggregator/tests/test_aggregator_agent.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
EventAggregatorServiceStub,
2323
)
2424
from ray.core.generated.events_event_aggregator_service_pb2 import (
25-
AddEventRequest,
25+
AddEventsRequest,
2626
RayEventsData,
2727
TaskEventsMetadata,
2828
)
@@ -92,7 +92,7 @@ def test_aggregator_agent_receive_publish_events_normally(
9292
seconds, nanos = divmod(test_time, 10**9)
9393
timestamp = Timestamp(seconds=seconds, nanos=nanos)
9494

95-
request = AddEventRequest(
95+
request = AddEventsRequest(
9696
events_data=RayEventsData(
9797
events=[
9898
RayEvent(
@@ -154,7 +154,7 @@ def test_aggregator_agent_receive_event_full(
154154
seconds, nanos = divmod(now, 10**9)
155155
timestamp = Timestamp(seconds=seconds, nanos=nanos)
156156

157-
request = AddEventRequest(
157+
request = AddEventsRequest(
158158
events_data=RayEventsData(
159159
events=[
160160
RayEvent(
@@ -196,7 +196,7 @@ def test_aggregator_agent_receive_dropped_at_core_worker(
196196
seconds, nanos = divmod(now, 10**9)
197197
timestamp = Timestamp(seconds=seconds, nanos=nanos)
198198

199-
request = AddEventRequest(
199+
request = AddEventsRequest(
200200
events_data=RayEventsData(
201201
events=[
202202
RayEvent(
@@ -247,7 +247,7 @@ def test_aggregator_agent_receive_multiple_events(
247247
now = time.time_ns()
248248
seconds, nanos = divmod(now, 10**9)
249249
timestamp = Timestamp(seconds=seconds, nanos=nanos)
250-
request = AddEventRequest(
250+
request = AddEventsRequest(
251251
events_data=RayEventsData(
252252
events=[
253253
RayEvent(
@@ -306,7 +306,7 @@ def test_aggregator_agent_receive_multiple_events_failures(
306306
now = time.time_ns()
307307
seconds, nanos = divmod(now, 10**9)
308308
timestamp = Timestamp(seconds=seconds, nanos=nanos)
309-
request = AddEventRequest(
309+
request = AddEventsRequest(
310310
events_data=RayEventsData(
311311
events=[
312312
RayEvent(
@@ -353,7 +353,7 @@ def test_aggregator_agent_receive_empty_events(
353353
cluster.webui_url, cluster.gcs_address, cluster.head_node.node_id
354354
)
355355
httpserver.expect_request("/", method="POST").respond_with_data("", status=200)
356-
request = AddEventRequest(
356+
request = AddEventsRequest(
357357
events_data=RayEventsData(
358358
events=[],
359359
task_events_metadata=TaskEventsMetadata(

python/ray/tests/test_metrics_agent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from ray.core.generated.common_pb2 import TaskAttempt
2323
from ray.core.generated.events_base_event_pb2 import RayEvent
2424
from ray.core.generated.events_event_aggregator_service_pb2 import (
25-
AddEventRequest,
25+
AddEventsRequest,
2626
RayEventsData,
2727
TaskEventsMetadata,
2828
)
@@ -528,7 +528,7 @@ def test_case_value_correct():
528528
now = time.time_ns()
529529
seconds, nanos = divmod(now, 10**9)
530530
timestamp = Timestamp(seconds=seconds, nanos=nanos)
531-
request = AddEventRequest(
531+
request = AddEventsRequest(
532532
events_data=RayEventsData(
533533
events=[
534534
RayEvent(

src/ray/common/grpc_util.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <google/protobuf/map.h>
1818
#include <google/protobuf/repeated_field.h>
19+
#include <google/protobuf/timestamp.pb.h>
1920
#include <google/protobuf/util/message_differencer.h>
2021
#include <grpcpp/grpcpp.h>
2122

@@ -231,4 +232,15 @@ inline grpc::ChannelArguments CreateDefaultChannelArguments() {
231232
return arguments;
232233
}
233234

235+
// Convert an epoch time in nanoseconds to a protobuf timestamp
236+
// Ref: https://protobuf.dev/reference/php/api-docs/Google/Protobuf/Timestamp.html
237+
inline google::protobuf::Timestamp AbslTimeNanosToProtoTimestamp(int64_t nanos) {
238+
google::protobuf::Timestamp timestamp;
239+
240+
// Extract the seconds and the fractional nanoseconds from the epoch time
241+
timestamp.set_seconds(nanos / 1000000000);
242+
timestamp.set_nanos(nanos % 1000000000);
243+
return timestamp;
244+
}
245+
234246
} // namespace ray

src/ray/common/ray_config_def.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,17 @@ RAY_CONFIG(bool, enable_export_api_write, false)
921921
// Example config: `export RAY_enable_export_api_write_config='EXPORT_ACTOR,EXPORT_TASK'`
922922
RAY_CONFIG(std::vector<std::string>, enable_export_api_write_config, {})
923923

924+
// Whether the task events from the core worker are sent to GCS directly.
925+
// TODO(myan): #54515 Remove this flag after the task events to GCS path is fully
926+
// migrated to the event aggregator.
927+
RAY_CONFIG(bool, enable_core_worker_task_event_to_gcs, true)
928+
929+
// Whether to enable the ray event to send to the event aggregator.
930+
// Currently, only task events are supported.
931+
// TODO(myan): #54515 Remove this flag after the task events are fully migrated to the
932+
// event aggregator.
933+
RAY_CONFIG(bool, enable_core_worker_ray_event_to_aggregator, false)
934+
924935
// Configuration for pipe logger buffer size.
925936
RAY_CONFIG(uint64_t, pipe_logger_read_buf_size, 1024)
926937

src/ray/core_worker/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ ray_cc_library(
207207
srcs = ["task_event_buffer.cc"],
208208
hdrs = ["task_event_buffer.h"],
209209
deps = [
210+
"//:event_aggregator_client",
210211
"//src/ray/common:asio",
211212
"//src/ray/common:id",
212213
"//src/ray/common:task_common",

src/ray/core_worker/core_worker.cc

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "ray/common/task/task_util.h"
4242
#include "ray/gcs/gcs_client/gcs_client.h"
4343
#include "ray/gcs/pb_util.h"
44+
#include "ray/rpc/event_aggregator_client.h"
4445
#include "ray/util/container_util.h"
4546
#include "ray/util/event.h"
4647
#include "ray/util/subreaper.h"
@@ -399,7 +400,7 @@ CoreWorker::CoreWorker(
399400
options_.language,
400401
worker_context_->GetCurrentJobID(),
401402
// Driver has no parent task
402-
/* parent_task_id */ TaskID::Nil(),
403+
/*parent_task_id=*/TaskID::Nil(),
403404
GetCallerId(),
404405
rpc_address_,
405406
TaskID::Nil());
@@ -417,6 +418,7 @@ CoreWorker::CoreWorker(
417418
/*attempt_number=*/0,
418419
rpc::TaskStatus::RUNNING,
419420
/*timestamp=*/absl::GetCurrentTimeNanos(),
421+
/*is_actor_task=*/false,
420422
std::make_shared<const TaskSpecification>(std::move(spec)));
421423
task_event_buffer_->AddTaskEvent(std::move(task_event));
422424
}
@@ -588,7 +590,8 @@ void CoreWorker::Disconnect(
588590
worker_context_->GetCurrentJobID(),
589591
/* attempt_number */ 0,
590592
rpc::TaskStatus::FINISHED,
591-
/* timestamp */ absl::GetCurrentTimeNanos());
593+
/*timestamp=*/absl::GetCurrentTimeNanos(),
594+
/*is_actor_task_event=*/worker_context_->GetCurrentActorID().IsNil());
592595
task_event_buffer_->AddTaskEvent(std::move(task_event));
593596
}
594597

@@ -1041,7 +1044,7 @@ Status CoreWorker::PutInLocalPlasmaStore(const RayObject &object,
10411044
bool pin_object) {
10421045
bool object_exists = false;
10431046
RAY_RETURN_NOT_OK(plasma_store_provider_->Put(
1044-
object, object_id, /* owner_address = */ rpc_address_, &object_exists));
1047+
object, object_id, /*owner_address=*/rpc_address_, &object_exists));
10451048
if (!object_exists) {
10461049
if (pin_object) {
10471050
// Tell the raylet to pin the object **after** it is created.
@@ -1158,7 +1161,7 @@ Status CoreWorker::CreateOwnedAndIncrementLocalRef(
11581161
status = plasma_store_provider_->Create(metadata,
11591162
data_size,
11601163
*object_id,
1161-
/* owner_address = */ real_owner_address,
1164+
/*owner_address=*/real_owner_address,
11621165
data,
11631166
/*created_by_worker=*/true,
11641167
is_experimental_mutable_object);
@@ -1200,7 +1203,7 @@ Status CoreWorker::ExperimentalChannelWriteAcquire(
12001203
int64_t timeout_ms,
12011204
std::shared_ptr<Buffer> *data) {
12021205
Status status = experimental_mutable_object_provider_->GetChannelStatus(
1203-
object_id, /*is_reader*/ false);
1206+
object_id, /*is_reader=*/false);
12041207
if (!status.ok()) {
12051208
return status;
12061209
}
@@ -1350,7 +1353,7 @@ Status CoreWorker::Get(const std::vector<ObjectID> &ids,
13501353
bool is_experimental_channel = false;
13511354
for (const ObjectID &id : ids) {
13521355
Status status =
1353-
experimental_mutable_object_provider_->GetChannelStatus(id, /*is_reader*/ true);
1356+
experimental_mutable_object_provider_->GetChannelStatus(id, /*is_reader=*/true);
13541357
if (status.ok()) {
13551358
is_experimental_channel = true;
13561359
// We continue rather than break because we want to check that *all* of the
@@ -2052,7 +2055,7 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
20522055
/*include_job_config*/ true,
20532056
/*generator_backpressure_num_objects*/
20542057
task_options.generator_backpressure_num_objects,
2055-
/*enable_task_event*/ task_options.enable_task_events,
2058+
/*enable_task_event=*/task_options.enable_task_events,
20562059
task_options.labels,
20572060
task_options.label_selector);
20582061
ActorID root_detached_actor_id;
@@ -2136,10 +2139,10 @@ Status CoreWorker::CreateActor(const RayFunction &function,
21362139
rpc_address_,
21372140
function,
21382141
args,
2139-
/*num_returns*/ 0,
2142+
/*num_returns=*/0,
21402143
new_resource,
21412144
new_placement_resources,
2142-
"" /* debugger_breakpoint */,
2145+
/*debugger_breakpoint=*/"",
21432146
depth,
21442147
actor_creation_options.serialized_runtime_env_info,
21452148
call_site,
@@ -2242,7 +2245,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
22422245
task_spec,
22432246
CurrentCallSite(),
22442247
// Actor creation task retry happens on GCS not on core worker.
2245-
/*max_retries*/ 0);
2248+
/*max_retries=*/0);
22462249

22472250
if (actor_name.empty()) {
22482251
io_service_.post(
@@ -2416,19 +2419,19 @@ Status CoreWorker::SubmitActorTask(
24162419
task_options.num_returns,
24172420
task_options.resources,
24182421
required_resources,
2419-
"", /* debugger_breakpoint */
2420-
depth, /*depth*/
2421-
"{}", /* serialized_runtime_env_info */
2422+
/*debugger_breakpoint=*/"",
2423+
depth,
2424+
/*serialized_runtime_env_info=*/"{}",
24222425
call_site,
24232426
worker_context_->GetMainThreadOrActorCreationTaskID(),
24242427
task_options.concurrency_group_name,
2425-
/*include_job_config*/ false,
2426-
/*generator_backpressure_num_objects*/
2428+
/*include_job_config=*/false,
2429+
/*generator_backpressure_num_objects=*/
24272430
task_options.generator_backpressure_num_objects,
2428-
/*enable_task_events*/ task_options.enable_task_events,
2429-
/*labels*/ {},
2430-
/*label_selector*/ {},
2431-
/*tensor_transport*/ task_options.tensor_transport);
2431+
/*enable_task_events=*/task_options.enable_task_events,
2432+
/*labels=*/{},
2433+
/*label_selector=*/{},
2434+
/*tensor_transport=*/task_options.tensor_transport);
24322435
// NOTE: placement_group_capture_child_tasks and runtime_env will
24332436
// be ignored in the actor because we should always follow the actor's option.
24342437

@@ -2824,7 +2827,7 @@ Status CoreWorker::ExecuteTask(
28242827
task_spec.AttemptNumber(),
28252828
task_spec,
28262829
rpc::TaskStatus::RUNNING,
2827-
/* include_task_info */ false,
2830+
/*include_task_info=*/false,
28282831
update));
28292832

28302833
worker_context_->SetCurrentTask(task_spec);
@@ -3212,7 +3215,7 @@ void CoreWorker::HandleReportGeneratorItemReturns(
32123215
auto worker_id = WorkerID::FromBinary(request.worker_addr().worker_id());
32133216
task_manager_->HandleReportGeneratorItemReturns(
32143217
request,
3215-
/*execution_signal_callback*/
3218+
/*execution_signal_callback=*/
32163219
[reply,
32173220
worker_id = std::move(worker_id),
32183221
generator_id = std::move(generator_id),
@@ -3736,8 +3739,8 @@ void CoreWorker::HandleUpdateObjectLocationBatch(
37363739
}
37373740

37383741
send_reply_callback(Status::OK(),
3739-
/*success_callback_on_reply*/ nullptr,
3740-
/*failure_callback_on_reply*/ nullptr);
3742+
/*success_callback_on_reply=*/nullptr,
3743+
/*failure_callback_on_reply=*/nullptr);
37413744
}
37423745

37433746
void CoreWorker::AddSpilledObjectLocationOwner(
@@ -3975,7 +3978,7 @@ void CoreWorker::CancelTaskOnExecutor(TaskID task_id,
39753978
}
39763979
}
39773980

3978-
on_canceled(/*success*/ success, /*requested_task_running*/ requested_task_running);
3981+
on_canceled(/*success=*/success, /*requested_task_running=*/requested_task_running);
39793982
}
39803983

39813984
void CoreWorker::CancelActorTaskOnExecutor(WorkerID caller_worker_id,
@@ -4567,7 +4570,7 @@ void CoreWorker::RecordTaskLogStart(const TaskID &task_id,
45674570
attempt_number,
45684571
*current_task,
45694572
rpc::TaskStatus::NIL,
4570-
/* include_task_info */ false,
4573+
/*include_task_info=*/false,
45714574
worker::TaskStatusEvent::TaskStateUpdate(task_log_info)));
45724575
}
45734576

@@ -4591,7 +4594,7 @@ void CoreWorker::RecordTaskLogEnd(const TaskID &task_id,
45914594
attempt_number,
45924595
*current_task,
45934596
rpc::TaskStatus::NIL,
4594-
/* include_task_info */ false,
4597+
/*include_task_info=*/false,
45954598
worker::TaskStatusEvent::TaskStateUpdate(task_log_info)));
45964599
}
45974600

@@ -4609,7 +4612,7 @@ void CoreWorker::UpdateTaskIsDebuggerPaused(const TaskID &task_id,
46094612
running_task_it->second.AttemptNumber(),
46104613
running_task_it->second,
46114614
rpc::TaskStatus::NIL,
4612-
/* include_task_info */ false,
4615+
/*include_task_info=*/false,
46134616
worker::TaskStatusEvent::TaskStateUpdate(is_debugger_paused)));
46144617
}
46154618

src/ray/core_worker/core_worker_process.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
171171
}
172172

173173
auto task_event_buffer = std::make_unique<worker::TaskEventBufferImpl>(
174-
std::make_shared<gcs::GcsClient>(options.gcs_options));
174+
std::make_unique<gcs::GcsClient>(options.gcs_options),
175+
std::make_unique<rpc::EventAggregatorClientImpl>(options.metrics_agent_port,
176+
*client_call_manager));
175177

176178
// Start the IO thread first to make sure the checker is working.
177179
boost::thread::attributes io_thread_attrs;

0 commit comments

Comments
 (0)