@@ -795,14 +795,14 @@ void NodeManager::HandleDisconnectedActor(const ActorID &actor_id, bool was_loca
795
795
}
796
796
}
797
797
// Update the actor's state.
798
- ActorTableData new_actor_info = actor_entry->second .GetTableData ();
799
- new_actor_info .set_state (new_state);
798
+ ActorTableData new_actor_data = actor_entry->second .GetTableData ();
799
+ new_actor_data .set_state (new_state);
800
800
if (was_local) {
801
801
// If the actor was local, immediately update the state in actor registry.
802
802
// So if we receive any actor tasks before we receive GCS notification,
803
803
// these tasks can be correctly routed to the `MethodsWaitingForActorCreation`
804
804
// queue, instead of being assigned to the dead actor.
805
- HandleActorStateTransition (actor_id, ActorRegistration (new_actor_info ));
805
+ HandleActorStateTransition (actor_id, ActorRegistration (new_actor_data ));
806
806
}
807
807
808
808
auto done = [was_local, actor_id](Status status) {
@@ -812,7 +812,7 @@ void NodeManager::HandleDisconnectedActor(const ActorID &actor_id, bool was_loca
812
812
RAY_LOG (FATAL) << " Failed to update state for actor " << actor_id;
813
813
}
814
814
};
815
- auto actor_notification = std::make_shared<ActorTableData>(new_actor_info );
815
+ auto actor_notification = std::make_shared<ActorTableData>(new_actor_data );
816
816
RAY_CHECK_OK (gcs_client_->Actors ().AsyncUpdate (actor_id, actor_notification, done));
817
817
}
818
818
@@ -1826,46 +1826,45 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
1826
1826
}
1827
1827
}
1828
1828
1829
- std::shared_ptr< ActorTableData> NodeManager::CreateActorTableDataFromCreationTask (
1829
+ ActorTableData NodeManager::CreateActorTableDataFromCreationTask (
1830
1830
const TaskSpecification &task_spec) {
1831
1831
RAY_CHECK (task_spec.IsActorCreationTask ());
1832
1832
auto actor_id = task_spec.ActorCreationId ();
1833
1833
auto actor_entry = actor_registry_.find (actor_id);
1834
- std::shared_ptr< ActorTableData> actor_info_ptr ;
1834
+ ActorTableData new_actor_data ;
1835
1835
// TODO(swang): If this is an actor that was reconstructed, and previous
1836
1836
// actor notifications were delayed, then this node may not have an entry for
1837
1837
// the actor in actor_regisry_. Then, the fields for the number of
1838
1838
// reconstructions will be wrong.
1839
1839
if (actor_entry == actor_registry_.end ()) {
1840
- actor_info_ptr.reset (new ActorTableData ());
1841
1840
// Set all of the static fields for the actor. These fields will not
1842
1841
// change even if the actor fails or is reconstructed.
1843
- actor_info_ptr-> set_actor_id (actor_id.Binary ());
1844
- actor_info_ptr-> set_actor_creation_dummy_object_id (
1842
+ new_actor_data. set_actor_id (actor_id.Binary ());
1843
+ new_actor_data. set_actor_creation_dummy_object_id (
1845
1844
task_spec.ActorDummyObject ().Binary ());
1846
- actor_info_ptr-> set_job_id (task_spec.JobId ().Binary ());
1847
- actor_info_ptr-> set_max_reconstructions (task_spec.MaxActorReconstructions ());
1845
+ new_actor_data. set_job_id (task_spec.JobId ().Binary ());
1846
+ new_actor_data. set_max_reconstructions (task_spec.MaxActorReconstructions ());
1848
1847
// This is the first time that the actor has been created, so the number
1849
1848
// of remaining reconstructions is the max.
1850
- actor_info_ptr-> set_remaining_reconstructions (task_spec.MaxActorReconstructions ());
1849
+ new_actor_data. set_remaining_reconstructions (task_spec.MaxActorReconstructions ());
1851
1850
} else {
1852
1851
// If we've already seen this actor, it means that this actor was reconstructed.
1853
1852
// Thus, its previous state must be RECONSTRUCTING.
1854
1853
RAY_CHECK (actor_entry->second .GetState () == ActorTableData::RECONSTRUCTING);
1855
1854
// Copy the static fields from the current actor entry.
1856
- actor_info_ptr. reset ( new ActorTableData ( actor_entry->second .GetTableData ()) );
1855
+ new_actor_data = actor_entry->second .GetTableData ();
1857
1856
// We are reconstructing the actor, so subtract its
1858
1857
// remaining_reconstructions by 1.
1859
- actor_info_ptr-> set_remaining_reconstructions (
1860
- actor_info_ptr-> remaining_reconstructions () - 1 );
1858
+ new_actor_data. set_remaining_reconstructions (
1859
+ new_actor_data. remaining_reconstructions () - 1 );
1861
1860
}
1862
1861
1863
1862
// Set the new fields for the actor's state to indicate that the actor is
1864
1863
// now alive on this node manager.
1865
- actor_info_ptr-> set_node_manager_id (
1864
+ new_actor_data. set_node_manager_id (
1866
1865
gcs_client_->client_table ().GetLocalClientId ().Binary ());
1867
- actor_info_ptr-> set_state (ActorTableData::ALIVE);
1868
- return actor_info_ptr ;
1866
+ new_actor_data. set_state (ActorTableData::ALIVE);
1867
+ return new_actor_data ;
1869
1868
}
1870
1869
1871
1870
void NodeManager::FinishAssignedActorTask (Worker &worker, const Task &task) {
@@ -1971,8 +1970,8 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id
1971
1970
bool resumed_from_checkpoint) {
1972
1971
// Notify the other node managers that the actor has been created.
1973
1972
const ActorID actor_id = task_spec.ActorCreationId ();
1974
- auto new_actor_info = CreateActorTableDataFromCreationTask (task_spec);
1975
- new_actor_info-> set_parent_actor_id (parent_actor_id.Binary ());
1973
+ auto new_actor_data = CreateActorTableDataFromCreationTask (task_spec);
1974
+ new_actor_data. set_parent_actor_id (parent_actor_id.Binary ());
1976
1975
auto update_callback = [actor_id](Status status) {
1977
1976
if (!status.ok ()) {
1978
1977
// Only one node at a time should succeed at creating or updating the actor.
@@ -1990,20 +1989,21 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id
1990
1989
<< actor_id;
1991
1990
RAY_CHECK_OK (gcs_client_->actor_checkpoint_table ().Lookup (
1992
1991
JobID::Nil (), checkpoint_id,
1993
- [this , actor_id, new_actor_info , update_callback](
1992
+ [this , actor_id, new_actor_data , update_callback](
1994
1993
ray::gcs::RedisGcsClient *client, const UniqueID &checkpoint_id,
1995
1994
const ActorCheckpointData &checkpoint_data) {
1996
1995
RAY_LOG (INFO) << " Restoring registration for actor " << actor_id
1997
1996
<< " from checkpoint " << checkpoint_id;
1998
1997
ActorRegistration actor_registration =
1999
- ActorRegistration (*new_actor_info , checkpoint_data);
1998
+ ActorRegistration (new_actor_data , checkpoint_data);
2000
1999
// Mark the unreleased dummy objects in the checkpoint frontier as local.
2001
2000
for (const auto &entry : actor_registration.GetDummyObjects ()) {
2002
2001
HandleObjectLocal (entry.first );
2003
2002
}
2004
2003
HandleActorStateTransition (actor_id, std::move (actor_registration));
2004
+ auto actor_notification = std::make_shared<ActorTableData>(new_actor_data);
2005
2005
// The actor was created before.
2006
- RAY_CHECK_OK (gcs_client_->Actors ().AsyncUpdate (actor_id, new_actor_info ,
2006
+ RAY_CHECK_OK (gcs_client_->Actors ().AsyncUpdate (actor_id, actor_notification ,
2007
2007
update_callback));
2008
2008
},
2009
2009
[actor_id](ray::gcs::RedisGcsClient *client, const UniqueID &checkpoint_id) {
@@ -2013,14 +2013,16 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id
2013
2013
} else {
2014
2014
// The actor did not resume from a checkpoint. Immediately notify the
2015
2015
// other node managers that the actor has been created.
2016
- HandleActorStateTransition (actor_id, ActorRegistration (*new_actor_info));
2016
+ HandleActorStateTransition (actor_id, ActorRegistration (new_actor_data));
2017
+ auto actor_notification = std::make_shared<ActorTableData>(new_actor_data);
2017
2018
if (actor_registry_.find (actor_id) != actor_registry_.end ()) {
2018
2019
// The actor was created before.
2019
- RAY_CHECK_OK (
2020
- gcs_client_-> Actors (). AsyncUpdate (actor_id, new_actor_info, update_callback));
2020
+ RAY_CHECK_OK (gcs_client_-> Actors (). AsyncUpdate (actor_id, actor_notification,
2021
+ update_callback));
2021
2022
} else {
2022
2023
// The actor was never created before.
2023
- RAY_CHECK_OK (gcs_client_->Actors ().AsyncRegister (new_actor_info, update_callback));
2024
+ RAY_CHECK_OK (
2025
+ gcs_client_->Actors ().AsyncRegister (actor_notification, update_callback));
2024
2026
}
2025
2027
}
2026
2028
if (!resumed_from_checkpoint) {
0 commit comments