Skip to content

[23088] Solve Discovery Server race conditions (backport #5780) #5808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 2.10.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 164 additions & 6 deletions src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,6 @@ void DiscoveryDataBase::process_pdp_data_queue()
// Lock(exclusive mode) mutex locally
std::lock_guard<std::recursive_mutex> guard(mutex_);

// Swap DATA queues
pdp_data_queue_.Swap();

// Process all messages in the queque
while (!pdp_data_queue_.Empty())
{
Expand Down Expand Up @@ -519,10 +516,14 @@ bool DiscoveryDataBase::process_edp_data_queue()
// Lock(exclusive mode) mutex locally
std::lock_guard<std::recursive_mutex> guard(mutex_);

<<<<<<< HEAD
// Swap DATA queues
edp_data_queue_.Swap();

eprosima::fastrtps::rtps::CacheChange_t* change;
=======
eprosima::fastdds::rtps::CacheChange_t* change;
>>>>>>> ec666f72 (Solve Discovery Server race conditions (#5780))
std::string topic_name;

// Process all messages in the queque
Expand Down Expand Up @@ -603,6 +604,52 @@ void DiscoveryDataBase::match_new_server_(
assert(our_data_it != participants_.end());
add_pdp_to_send_(our_data_it->second.change());

<<<<<<< HEAD
=======
if (!is_superclient)
{
// To obtain a mesh topology with servers, we need to:
// - Make all known servers relevant to the new server
// - Make the new server relevant to all known servers
// - Send DATA(p) of all known servers to the new server
// - Send Data(p) of the new server to all other servers
for (auto& part : participants_)
{
if (part.first != server_guid_prefix_ && !part.second.is_client() && !part.second.is_superclient())
{
if (part.first == participant_prefix)
{
std::lock_guard<std::recursive_mutex> guard(mutex_);
bool resend_new_pdp = false;
for (auto& server: servers_)
{
if (server != participant_prefix)
{
// Make all known servers relevant to the new server, but not matched
part.second.add_or_update_ack_participant(server, ParticipantState::PENDING_SEND);
resend_new_pdp = true;
}
}
if (resend_new_pdp)
{
// Send DATA(p) of the new server to all other servers.
add_pdp_to_send_(part.second.change());
}
}
else
{
// Make the new server relevant to all known servers
part.second.add_or_update_ack_participant(participant_prefix, ParticipantState::PENDING_SEND);
// Send DATA(p) of all known servers to the new participant
add_pdp_to_send_(part.second.change());
}
}
}
}
// The resources needed for TCP new connections are created during the matching process when the
// DATA(p) is received by each server.

>>>>>>> ec666f72 (Solve Discovery Server race conditions (#5780))
// Create virtual endpoints
create_virtual_endpoints_(participant_prefix);
}
Expand Down Expand Up @@ -728,32 +775,81 @@ void DiscoveryDataBase::update_participant_from_change_(
{
fastrtps::rtps::GUID_t change_guid = guid_from_change(ch);

assert(ch->kind == eprosima::fastdds::rtps::ALIVE);

// If the change corresponds to a previously removed participant (which hasn't yet been removed from the map since
// the DATA(Up) is still unacked), update map with new data and behave as if it was a new participant.
// Remove also the old change from the disposals collection, if it was added just before
if (participant_info.change()->kind != eprosima::fastdds::rtps::ALIVE)
{
// Update the change data
participant_info.participant_change_data(change_data);

// Remove old change from disposals if it was added just before to avoid sending data UP
auto it = std::find(disposals_.begin(), disposals_.end(), participant_info.change());
if (it != disposals_.end())
{
disposals_.erase(it);
}

// Update change. This should add the UNALIVE change to changes_to_release_, which should later both remove the
// change from the writer's history and release the change
update_change_and_unmatch_(ch, participant_info);

// If it is local and server we have to create virtual endpoints, except for our own server
if (change_guid.guidPrefix != server_guid_prefix_ && !change_data.is_client() && change_data.is_local())
{
// Match new server and create virtual endpoints
// NOTE: match after having updated the change, so virtual endpoints are not discarded for having
// an associated unalive participant
match_new_server_(change_guid.guidPrefix, change_data.is_superclient());
}

// Treat as a new participant found
new_updates_++;
if (change_guid.guidPrefix != server_guid_prefix_)
{
server_acked_by_all(false);
}
}

// Specific case when a Data(P) from an entity A known as remote comes from the very entity A (we have
// the Data(P) because of other server B, but now it arrives from A itself)
// The entity A changes to local
// Must be local data, or else it is a remote endpoint and should not be changed
if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local() &&
else if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local() &&
DiscoveryDataBase::participant_data_has_changed_(participant_info, change_data))
{
<<<<<<< HEAD
// If the participant changes to server local, virtual endpoints must be added
// If it is local and server the only possibility is it was a remote server and it must be converted to local
if (!change_data.is_client())
{
match_new_server_(change_guid.guidPrefix);
}

=======
>>>>>>> ec666f72 (Solve Discovery Server race conditions (#5780))
// Update the change data
participant_info.participant_change_data(change_data);

// Update change
update_change_and_unmatch_(ch, participant_info);

// If the participant changes to server local, virtual endpoints must be added
// If it is local and server the only possibility is it was a remote server and it must be converted to local
if (!change_data.is_client())
{
// NOTE: match after having updated the change in order to send the new Data(P)
match_new_server_(change_guid.guidPrefix, change_data.is_superclient());
}

// Treat as a new participant found
new_updates_++;
server_acked_by_all(false);

// It is possible that this Data(P) is in our history if it has not been acked by all
// In this case we have to resent it with the new update
// In this case we have to resend it with the new update
if (!participant_info.is_acked_by_all())
{
add_pdp_to_send_(ch);
Expand Down Expand Up @@ -856,6 +952,29 @@ void DiscoveryDataBase::create_writers_from_change_(
// The writer was NOT known by the database
else
{
// Check if corresponding participant is known, abort otherwise
// NOTE: Processing a DATA(w) should always be preceded by the reception and processing of its corresponding
// participant. However, one may receive a DATA(w) just after the participant has been removed, case in which the
// former should no longer be processed.
std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator writer_part_it =
participants_.find(writer_guid.guidPrefix);
if (writer_part_it == participants_.end())
{
EPROSIMA_LOG_ERROR(DISCOVERY_DATABASE,
"Writer " << writer_guid << " has no associated participant. Skipping");
assert(topic_name != virtual_topic_);
changes_to_release_.push_back(ch); // Release change so it can be reused
return;
}
else if (writer_part_it->second.change()->kind != fastdds::rtps::ChangeKind_t::ALIVE)
{
EPROSIMA_LOG_WARNING(DISCOVERY_DATABASE,
"Writer " << writer_guid << " is associated to a removed participant. Skipping");
assert(topic_name != virtual_topic_);
changes_to_release_.push_back(ch); // Release change so it can be reused
return;
}

// Add entry to writers_
DiscoveryEndpointInfo tmp_writer(
ch,
Expand All @@ -876,6 +995,7 @@ void DiscoveryDataBase::create_writers_from_change_(
new_updates_++;

// Add entry to participants_[guid_prefix]::writers
<<<<<<< HEAD
std::map<eprosima::fastrtps::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator writer_part_it =
participants_.find(writer_guid.guidPrefix);
if (writer_part_it != participants_.end())
Expand All @@ -888,6 +1008,9 @@ void DiscoveryDataBase::create_writers_from_change_(
"Writer " << writer_guid << " has no associated participant. Skipping");
return;
}
=======
writer_part_it->second.add_writer(writer_guid);
>>>>>>> ec666f72 (Solve Discovery Server race conditions (#5780))

// Add writer to writers_by_topic_[topic_name]
add_writer_to_topic_(writer_guid, topic_name);
Expand Down Expand Up @@ -974,6 +1097,29 @@ void DiscoveryDataBase::create_readers_from_change_(
// The reader was NOT known by the database
else
{
// Check if corresponding participant is known, abort otherwise
// NOTE: Processing a DATA(r) should always be preceded by the reception and processing of its corresponding
// participant. However, one may receive a DATA(r) just after the participant has been removed, case in which the
// former should no longer be processed.
std::map<eprosima::fastdds::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator reader_part_it =
participants_.find(reader_guid.guidPrefix);
if (reader_part_it == participants_.end())
{
EPROSIMA_LOG_ERROR(DISCOVERY_DATABASE,
"Reader " << reader_guid << " has no associated participant. Skipping");
assert(topic_name != virtual_topic_);
changes_to_release_.push_back(ch); // Release change so it can be reused
return;
}
else if (reader_part_it->second.change()->kind != fastdds::rtps::ChangeKind_t::ALIVE)
{
EPROSIMA_LOG_WARNING(DISCOVERY_DATABASE,
"Reader " << reader_guid << " is associated to a removed participant. Skipping");
assert(topic_name != virtual_topic_);
changes_to_release_.push_back(ch); // Release change so it can be reused
return;
}

// Add entry to readers_
DiscoveryEndpointInfo tmp_reader(
ch,
Expand All @@ -994,6 +1140,7 @@ void DiscoveryDataBase::create_readers_from_change_(
new_updates_++;

// Add entry to participants_[guid_prefix]::readers
<<<<<<< HEAD
std::map<eprosima::fastrtps::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator reader_part_it =
participants_.find(reader_guid.guidPrefix);
if (reader_part_it != participants_.end())
Expand All @@ -1006,6 +1153,9 @@ void DiscoveryDataBase::create_readers_from_change_(
"Reader " << reader_guid << " has no associated participant. Skipping");
return;
}
=======
reader_part_it->second.add_reader(reader_guid);
>>>>>>> ec666f72 (Solve Discovery Server race conditions (#5780))

// Add reader to readers_by_topic_[topic_name]
add_reader_to_topic_(reader_guid, topic_name);
Expand Down Expand Up @@ -1285,7 +1435,7 @@ void DiscoveryDataBase::process_dispose_participant_(
delete_reader_entity_(reader_guid);
}

// All participant endoints must be already unmatched in others endopoints relevant_ack maps
// All participant endpoints must be already unmatched in others endpoints relevant_ack maps

// Unmatch own participant
unmatch_participant_(participant_guid.guidPrefix);
Expand Down Expand Up @@ -1542,6 +1692,14 @@ bool DiscoveryDataBase::data_queue_empty()
return (pdp_data_queue_.BothEmpty() && edp_data_queue_.BothEmpty());
}

void DiscoveryDataBase::swap_data_queues()
{
// Swap EDP before PDP to avoid race condition in which both data P and w/r are received at the same time,
// just after having swapped the PDP queue
edp_data_queue_.Swap();
pdp_data_queue_.Swap();
}

bool DiscoveryDataBase::is_participant(
const eprosima::fastrtps::rtps::GUID_t& guid)
{
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ class DiscoveryDataBase
// Check if the data queue is empty
bool data_queue_empty();

// Swap both EDP and PDP data queues
void swap_data_queues();

void to_json(
nlohmann::json& j) const;

Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,9 @@ bool PDPServer::remove_remote_participant(
bool PDPServer::process_data_queues()
{
EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "process_data_queues start");
// Swap both as a first step in order to avoid the following race condition: reception of data w/r while processing
// the PDP queue, not having processed yet the corresponding data P (also received while processing the queue).
discovery_db_.swap_data_queues();
discovery_db_.process_pdp_data_queue();
return discovery_db_.process_edp_data_queue();
}
Expand Down
Loading