@@ -89,9 +89,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
89
89
object_manager_profile_timer_(io_service),
90
90
initial_config_(config),
91
91
local_available_resources_(config.resource_config),
92
- worker_pool_(io_service, config.num_initial_workers,
93
- config.maximum_startup_concurrency, gcs_client_,
94
- config.worker_commands),
92
+ worker_pool_(config.num_initial_workers, config.maximum_startup_concurrency,
93
+ gcs_client_, config.worker_commands),
95
94
scheduling_policy_(local_queues_),
96
95
reconstruction_policy_(
97
96
io_service_,
@@ -229,23 +228,22 @@ void NodeManager::HandleUnexpectedWorkerFailure(
229
228
}
230
229
231
230
void NodeManager::KillWorker (std::shared_ptr<Worker> worker) {
232
- #ifdef _WIN32
233
- // TODO(mehrdadn): Implement implement graceful process termination mechanism
234
- #else
235
231
// If we're just cleaning up a single worker, allow it some time to clean
236
232
// up its state before force killing. The client socket will be closed
237
233
// and the worker struct will be freed after the timeout.
238
- kill (worker->Process ().get ()->id (), SIGTERM);
239
- #endif
234
+ kill (worker->Pid (), SIGTERM);
240
235
241
236
auto retry_timer = std::make_shared<boost::asio::deadline_timer>(io_service_);
242
237
auto retry_duration = boost::posix_time::milliseconds (
243
238
RayConfig::instance ().kill_worker_timeout_milliseconds ());
244
239
retry_timer->expires_from_now (retry_duration);
245
240
retry_timer->async_wait ([retry_timer, worker](const boost::system ::error_code &error) {
246
- RAY_LOG (DEBUG) << " Send SIGKILL to worker, pid=" << worker->Process ().get ()->id ();
247
- // Force kill worker
248
- worker->Process ().get ()->terminate ();
241
+ RAY_LOG (DEBUG) << " Send SIGKILL to worker, pid=" << worker->Pid ();
242
+ // Force kill worker. TODO(mehrdadn, rkn): The worker may have already died
243
+ // and had its PID reassigned to a different process, at least on Windows.
244
+ // On Linux, this may or may not be the case, depending on e.g. whether
245
+ // the process has been already waited on. Regardless, this must be fixed.
246
+ kill (worker->Pid (), SIGKILL);
249
247
});
250
248
}
251
249
@@ -857,9 +855,8 @@ void NodeManager::ProcessClientMessage(
857
855
RAY_LOG (DEBUG) << " [Worker] Message "
858
856
<< protocol::EnumNameMessageType (message_type_value) << " ("
859
857
<< message_type << " ) from worker with PID "
860
- << (registered_worker
861
- ? std::to_string (registered_worker->Process ().get ()->id ())
862
- : " nil" );
858
+ << (registered_worker ? std::to_string (registered_worker->Pid ())
859
+ : " nil" );
863
860
if (registered_worker && registered_worker->IsDead ()) {
864
861
// For a worker that is marked as dead (because the job has died already),
865
862
// all the messages are ignored except DisconnectClient.
@@ -966,6 +963,12 @@ void NodeManager::ProcessClientMessage(
966
963
void NodeManager::ProcessRegisterClientRequestMessage (
967
964
const std::shared_ptr<LocalClientConnection> &client, const uint8_t *message_data) {
968
965
client->Register ();
966
+ auto message = flatbuffers::GetRoot<protocol::RegisterClientRequest>(message_data);
967
+ Language language = static_cast <Language>(message->language ());
968
+ WorkerID worker_id = from_flatbuf<WorkerID>(*message->worker_id ());
969
+ auto worker = std::make_shared<Worker>(worker_id, message->worker_pid (), language,
970
+ message->port (), client, client_call_manager_);
971
+ Status status;
969
972
flatbuffers::FlatBufferBuilder fbb;
970
973
auto reply =
971
974
ray::protocol::CreateRegisterClientReply (fbb, to_flatbuf (fbb, self_node_id_));
@@ -980,31 +983,24 @@ void NodeManager::ProcessRegisterClientRequestMessage(
980
983
}
981
984
});
982
985
983
- auto message = flatbuffers::GetRoot<protocol::RegisterClientRequest>(message_data);
984
- Language language = static_cast <Language>(message->language ());
985
- WorkerID worker_id = from_flatbuf<WorkerID>(*message->worker_id ());
986
- pid_t pid = message->worker_pid ();
987
- auto worker = std::make_shared<Worker>(worker_id, language, message->port (), client,
988
- client_call_manager_);
989
986
if (message->is_worker ()) {
990
987
// Register the new worker.
991
- if (worker_pool_.RegisterWorker (worker, pid ).ok ()) {
988
+ if (worker_pool_.RegisterWorker (worker).ok ()) {
992
989
HandleWorkerAvailable (worker->Connection ());
993
990
}
994
991
} else {
995
992
// Register the new driver.
996
- worker->SetProcess (ProcessHandle::FromPid (pid));
997
993
const JobID job_id = from_flatbuf<JobID>(*message->job_id ());
998
994
// Compute a dummy driver task id from a given driver.
999
995
const TaskID driver_task_id = TaskID::ComputeDriverTaskId (worker_id);
1000
996
worker->AssignTaskId (driver_task_id);
1001
997
worker->AssignJobId (job_id);
1002
- Status status = worker_pool_.RegisterDriver (worker);
998
+ status = worker_pool_.RegisterDriver (worker);
1003
999
if (status.ok ()) {
1004
1000
local_queues_.AddDriverTaskId (driver_task_id);
1005
- auto job_data_ptr =
1006
- gcs::CreateJobTableData ( job_id, /* is_dead*/ false , std::time (nullptr ),
1007
- initial_config_.node_manager_address , pid );
1001
+ auto job_data_ptr = gcs::CreateJobTableData (
1002
+ job_id, /* is_dead*/ false , std::time (nullptr ),
1003
+ initial_config_.node_manager_address , message-> worker_pid () );
1008
1004
RAY_CHECK_OK (gcs_client_->Jobs ().AsyncAdd (job_data_ptr, nullptr ));
1009
1005
}
1010
1006
}
@@ -1200,8 +1196,7 @@ void NodeManager::ProcessDisconnectClientMessage(
1200
1196
cluster_resource_map_[self_node_id_].Release (lifetime_resources.ToResourceSet ());
1201
1197
worker->ResetLifetimeResourceIds ();
1202
1198
1203
- RAY_LOG (DEBUG) << " Worker (pid=" << worker->Process ().get ()->id ()
1204
- << " ) is disconnected. "
1199
+ RAY_LOG (DEBUG) << " Worker (pid=" << worker->Pid () << " ) is disconnected. "
1205
1200
<< " job_id: " << worker->GetAssignedJobId ();
1206
1201
1207
1202
// Since some resources may have been released, we can try to dispatch more tasks.
@@ -1215,8 +1210,7 @@ void NodeManager::ProcessDisconnectClientMessage(
1215
1210
local_queues_.RemoveDriverTaskId (TaskID::ComputeDriverTaskId (driver_id));
1216
1211
worker_pool_.DisconnectDriver (worker);
1217
1212
1218
- RAY_LOG (DEBUG) << " Driver (pid=" << worker->Process ().get ()->id ()
1219
- << " ) is disconnected. "
1213
+ RAY_LOG (DEBUG) << " Driver (pid=" << worker->Pid () << " ) is disconnected. "
1220
1214
<< " job_id: " << job_id;
1221
1215
}
1222
1216
@@ -2296,8 +2290,7 @@ void NodeManager::AssignTask(const std::shared_ptr<Worker> &worker, const Task &
2296
2290
}
2297
2291
2298
2292
RAY_LOG (DEBUG) << " Assigning task " << spec.TaskId () << " to worker with pid "
2299
- << worker->Process ().get ()->id ()
2300
- << " , worker id: " << worker->WorkerId ();
2293
+ << worker->Pid () << " , worker id: " << worker->WorkerId ();
2301
2294
flatbuffers::FlatBufferBuilder fbb;
2302
2295
2303
2296
// Resource accounting: acquire resources for the assigned task.
@@ -3128,7 +3121,7 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request,
3128
3121
rpc::SendReplyCallback send_reply_callback) {
3129
3122
for (const auto &driver : worker_pool_.GetAllDrivers ()) {
3130
3123
auto worker_stats = reply->add_workers_stats ();
3131
- worker_stats->set_pid (driver->Process (). get ()-> id ());
3124
+ worker_stats->set_pid (driver->Pid ());
3132
3125
worker_stats->set_is_driver (true );
3133
3126
}
3134
3127
for (const auto task : local_queues_.GetTasks (TaskState::INFEASIBLE)) {
@@ -3191,7 +3184,7 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request,
3191
3184
<< status.ToString ();
3192
3185
} else {
3193
3186
auto worker_stats = reply->add_workers_stats ();
3194
- worker_stats->set_pid (worker->Process (). get ()-> id ());
3187
+ worker_stats->set_pid (worker->Pid ());
3195
3188
worker_stats->set_is_driver (false );
3196
3189
reply->set_num_workers (reply->num_workers () + 1 );
3197
3190
worker_stats->mutable_core_worker_stats ()->MergeFrom (r.core_worker_stats ());
0 commit comments