Skip to content

Commit c489c7d

Browse files
juanlofer-eprosimacferreiragonz
authored andcommitted
Solve Discovery Server race conditions (#5780)
* Refs #23088: Test reconnection when removing participant Signed-off-by: cferreiragonz <[email protected]> * Refs #23088: Solve EDP-PDP queues race condition Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #23088: Solve data UP + data P race condition Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #23088: Abort writer/reader processing if associated participant not alive Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #23088: Apply suggestions Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #23088: Release change when writer/reader insertion in DB failed Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #23088: Match servers after change update Signed-off-by: Juan Lopez Fernandez <[email protected]> --------- Signed-off-by: cferreiragonz <[email protected]> Signed-off-by: Juan Lopez Fernandez <[email protected]> Co-authored-by: cferreiragonz <[email protected]>
1 parent ab6f2d7 commit c489c7d

File tree

4 files changed

+264
-40
lines changed

4 files changed

+264
-40
lines changed

src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp

Lines changed: 105 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -494,9 +494,6 @@ void DiscoveryDataBase::process_pdp_data_queue()
494494
// Lock(exclusive mode) mutex locally
495495
std::lock_guard<std::recursive_mutex> guard(mutex_);
496496

497-
// Swap DATA queues
498-
pdp_data_queue_.Swap();
499-
500497
// Process all messages in the queque
501498
while (!pdp_data_queue_.Empty())
502499
{
@@ -534,9 +531,6 @@ bool DiscoveryDataBase::process_edp_data_queue()
534531
// Lock(exclusive mode) mutex locally
535532
std::lock_guard<std::recursive_mutex> guard(mutex_);
536533

537-
// Swap DATA queues
538-
edp_data_queue_.Swap();
539-
540534
eprosima::fastdds::rtps::CacheChange_t* change;
541535
std::string topic_name;
542536

@@ -660,7 +654,7 @@ void DiscoveryDataBase::match_new_server_(
660654
}
661655
}
662656
// The resources needed for TCP new connections are created during the matching process when the
663-
// DATA(p) is receieved by each server.
657+
// DATA(p) is received by each server.
664658

665659
// Create virtual endpoints
666660
create_virtual_endpoints_(participant_prefix);
@@ -788,32 +782,71 @@ void DiscoveryDataBase::update_participant_from_change_(
788782
{
789783
fastdds::rtps::GUID_t change_guid = guid_from_change(ch);
790784

785+
assert(ch->kind == eprosima::fastdds::rtps::ALIVE);
786+
787+
// If the change corresponds to a previously removed participant (which hasn't yet been removed from the map since
788+
// the DATA(Up) is still unacked), update map with new data and behave as if it was a new participant.
789+
// Remove also the old change from the disposals collection, if it was added just before
790+
if (participant_info.change()->kind != eprosima::fastdds::rtps::ALIVE)
791+
{
792+
// Update the change data
793+
participant_info.participant_change_data(change_data);
794+
795+
// Remove old change from disposals if it was added just before to avoid sending data UP
796+
auto it = std::find(disposals_.begin(), disposals_.end(), participant_info.change());
797+
if (it != disposals_.end())
798+
{
799+
disposals_.erase(it);
800+
}
801+
802+
// Update change. This should add the UNALIVE change to changes_to_release_, which should later both remove the
803+
// change from the writer's history and release the change
804+
update_change_and_unmatch_(ch, participant_info);
805+
806+
// If it is local and server we have to create virtual endpoints, except for our own server
807+
if (change_guid.guidPrefix != server_guid_prefix_ && !change_data.is_client() && change_data.is_local())
808+
{
809+
// Match new server and create virtual endpoints
810+
// NOTE: match after having updated the change, so virtual endpoints are not discarded for having
811+
// an associated unalive participant
812+
match_new_server_(change_guid.guidPrefix, change_data.is_superclient());
813+
}
814+
815+
// Treat as a new participant found
816+
new_updates_++;
817+
if (change_guid.guidPrefix != server_guid_prefix_)
818+
{
819+
server_acked_by_all(false);
820+
}
821+
}
822+
791823
// Specific case when a Data(P) from an entity A known as remote comes from the very entity A (we have
792824
// the Data(P) because of other server B, but now it arrives from A itself)
793825
// The entity A changes to local
794826
// Must be local data, or else it is a remote endpoint and should not be changed
795-
if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local() &&
827+
else if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local() &&
796828
DiscoveryDataBase::participant_data_has_changed_(participant_info, change_data))
797829
{
830+
// Update the change data
831+
participant_info.participant_change_data(change_data);
832+
833+
// Update change
834+
update_change_and_unmatch_(ch, participant_info);
835+
798836
// If the participant changes to server local, virtual endpoints must be added
799837
// If it is local and server the only possibility is it was a remote server and it must be converted to local
800838
if (!change_data.is_client())
801839
{
840+
// NOTE: match after having updated the change in order to send the new Data(P)
802841
match_new_server_(change_guid.guidPrefix, change_data.is_superclient());
803842
}
804843

805-
// Update the change data
806-
participant_info.participant_change_data(change_data);
807-
808-
// Update change
809-
update_change_and_unmatch_(ch, participant_info);
810-
811844
// Treat as a new participant found
812845
new_updates_++;
813846
server_acked_by_all(false);
814847

815848
// It is possible that this Data(P) is in our history if it has not been acked by all
816-
// In this case we have to resent it with the new update
849+
// In this case we have to resend it with the new update
817850
if (!participant_info.is_acked_by_all())
818851
{
819852
add_pdp_to_send_(ch);
@@ -916,6 +949,29 @@ void DiscoveryDataBase::create_writers_from_change_(
916949
// The writer was NOT known by the database
917950
else
918951
{
952+
// Check if corresponding participant is known, abort otherwise
953+
// NOTE: Processing a DATA(w) should always be preceded by the reception and processing of its corresponding
954+
// participant. However, one may receive a DATA(w) just after the participant has been removed, case in which the
955+
// former should no longer be processed.
956+
std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator writer_part_it =
957+
participants_.find(writer_guid.guidPrefix);
958+
if (writer_part_it == participants_.end())
959+
{
960+
EPROSIMA_LOG_ERROR(DISCOVERY_DATABASE,
961+
"Writer " << writer_guid << " has no associated participant. Skipping");
962+
assert(topic_name != virtual_topic_);
963+
changes_to_release_.push_back(ch); // Release change so it can be reused
964+
return;
965+
}
966+
else if (writer_part_it->second.change()->kind != fastdds::rtps::ChangeKind_t::ALIVE)
967+
{
968+
EPROSIMA_LOG_WARNING(DISCOVERY_DATABASE,
969+
"Writer " << writer_guid << " is associated to a removed participant. Skipping");
970+
assert(topic_name != virtual_topic_);
971+
changes_to_release_.push_back(ch); // Release change so it can be reused
972+
return;
973+
}
974+
919975
// Add entry to writers_
920976
DiscoveryEndpointInfo tmp_writer(
921977
ch,
@@ -936,18 +992,7 @@ void DiscoveryDataBase::create_writers_from_change_(
936992
new_updates_++;
937993

938994
// Add entry to participants_[guid_prefix]::writers
939-
std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator writer_part_it =
940-
participants_.find(writer_guid.guidPrefix);
941-
if (writer_part_it != participants_.end())
942-
{
943-
writer_part_it->second.add_writer(writer_guid);
944-
}
945-
else
946-
{
947-
EPROSIMA_LOG_ERROR(DISCOVERY_DATABASE,
948-
"Writer " << writer_guid << " has no associated participant. Skipping");
949-
return;
950-
}
995+
writer_part_it->second.add_writer(writer_guid);
951996

952997
// Add writer to writers_by_topic_[topic_name]
953998
add_writer_to_topic_(writer_guid, topic_name);
@@ -1034,6 +1079,29 @@ void DiscoveryDataBase::create_readers_from_change_(
10341079
// The reader was NOT known by the database
10351080
else
10361081
{
1082+
// Check if corresponding participant is known, abort otherwise
1083+
// NOTE: Processing a DATA(r) should always be preceded by the reception and processing of its corresponding
1084+
// participant. However, one may receive a DATA(r) just after the participant has been removed, case in which the
1085+
// former should no longer be processed.
1086+
std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator reader_part_it =
1087+
participants_.find(reader_guid.guidPrefix);
1088+
if (reader_part_it == participants_.end())
1089+
{
1090+
EPROSIMA_LOG_ERROR(DISCOVERY_DATABASE,
1091+
"Reader " << reader_guid << " has no associated participant. Skipping");
1092+
assert(topic_name != virtual_topic_);
1093+
changes_to_release_.push_back(ch); // Release change so it can be reused
1094+
return;
1095+
}
1096+
else if (reader_part_it->second.change()->kind != fastdds::rtps::ChangeKind_t::ALIVE)
1097+
{
1098+
EPROSIMA_LOG_WARNING(DISCOVERY_DATABASE,
1099+
"Reader " << reader_guid << " is associated to a removed participant. Skipping");
1100+
assert(topic_name != virtual_topic_);
1101+
changes_to_release_.push_back(ch); // Release change so it can be reused
1102+
return;
1103+
}
1104+
10371105
// Add entry to readers_
10381106
DiscoveryEndpointInfo tmp_reader(
10391107
ch,
@@ -1054,18 +1122,7 @@ void DiscoveryDataBase::create_readers_from_change_(
10541122
new_updates_++;
10551123

10561124
// Add entry to participants_[guid_prefix]::readers
1057-
std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator reader_part_it =
1058-
participants_.find(reader_guid.guidPrefix);
1059-
if (reader_part_it != participants_.end())
1060-
{
1061-
reader_part_it->second.add_reader(reader_guid);
1062-
}
1063-
else
1064-
{
1065-
EPROSIMA_LOG_ERROR(DISCOVERY_DATABASE,
1066-
"Reader " << reader_guid << " has no associated participant. Skipping");
1067-
return;
1068-
}
1125+
reader_part_it->second.add_reader(reader_guid);
10691126

10701127
// Add reader to readers_by_topic_[topic_name]
10711128
add_reader_to_topic_(reader_guid, topic_name);
@@ -1347,7 +1404,7 @@ void DiscoveryDataBase::process_dispose_participant_(
13471404
delete_reader_entity_(reader_guid);
13481405
}
13491406

1350-
// All participant endoints must be already unmatched in others endopoints relevant_ack maps
1407+
// All participant endpoints must be already unmatched in others endpoints relevant_ack maps
13511408

13521409
// Unmatch own participant
13531410
unmatch_participant_(participant_guid.guidPrefix);
@@ -1618,6 +1675,14 @@ bool DiscoveryDataBase::data_queue_empty()
16181675
return (pdp_data_queue_.BothEmpty() && edp_data_queue_.BothEmpty());
16191676
}
16201677

1678+
void DiscoveryDataBase::swap_data_queues()
1679+
{
1680+
// Swap EDP before PDP to avoid race condition in which both data P and w/r are received at the same time,
1681+
// just after having swapped the PDP queue
1682+
edp_data_queue_.Swap();
1683+
pdp_data_queue_.Swap();
1684+
}
1685+
16211686
bool DiscoveryDataBase::is_participant(
16221687
const eprosima::fastdds::rtps::GUID_t& guid)
16231688
{

src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,9 @@ class DiscoveryDataBase
294294
// Check if the data queue is empty
295295
bool data_queue_empty();
296296

297+
// Swap both EDP and PDP data queues
298+
void swap_data_queues();
299+
297300
void to_json(
298301
nlohmann::json& j) const;
299302

src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,6 +1080,9 @@ bool PDPServer::remove_remote_participant(
10801080
bool PDPServer::process_data_queues()
10811081
{
10821082
EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "process_data_queues start");
1083+
// Swap both as a first step in order to avoid the following race condition: reception of data w/r while processing
1084+
// the PDP queue, not having processed yet the corresponding data P (also received while processing the queue).
1085+
discovery_db_.swap_data_queues();
10831086
discovery_db_.process_pdp_data_queue();
10841087
return discovery_db_.process_edp_data_queue();
10851088
}

0 commit comments

Comments
 (0)