From c489c7d8f892ede5f4551d7ecae4b156bfabceb0 Mon Sep 17 00:00:00 2001 From: juanlofer-eprosima <88179026+juanlofer-eprosima@users.noreply.github.com> Date: Thu, 8 May 2025 07:32:16 +0200 Subject: [PATCH] Solve Discovery Server race conditions (#5780) * Refs #23088: Test reconnection when removing participant Signed-off-by: cferreiragonz * Refs #23088: Solve EDP-PDP queues race condition Signed-off-by: Juan Lopez Fernandez * Refs #23088: Solve data UP + data P race condition Signed-off-by: Juan Lopez Fernandez * Refs #23088: Abort writer/reader processing if associated participant not alive Signed-off-by: Juan Lopez Fernandez * Refs #23088: Apply suggestions Signed-off-by: Juan Lopez Fernandez * Refs #23088: Release change when writer/reader insertion in DB failed Signed-off-by: Juan Lopez Fernandez * Refs #23088: Match servers after change update Signed-off-by: Juan Lopez Fernandez --------- Signed-off-by: cferreiragonz Signed-off-by: Juan Lopez Fernandez Co-authored-by: cferreiragonz --- .../discovery/database/DiscoveryDataBase.cpp | 145 ++++++++++++----- .../discovery/database/DiscoveryDataBase.hpp | 3 + .../discovery/participant/PDPServer.cpp | 3 + .../common/BlackboxTestsDiscovery.cpp | 153 ++++++++++++++++++ 4 files changed, 264 insertions(+), 40 deletions(-) diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp index ad01fdce3bd..1f0c1908370 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp @@ -494,9 +494,6 @@ void DiscoveryDataBase::process_pdp_data_queue() // Lock(exclusive mode) mutex locally std::lock_guard guard(mutex_); - // Swap DATA queues - pdp_data_queue_.Swap(); - // Process all messages in the queque while (!pdp_data_queue_.Empty()) { @@ -534,9 +531,6 @@ bool DiscoveryDataBase::process_edp_data_queue() // Lock(exclusive mode) mutex locally std::lock_guard guard(mutex_); - // Swap DATA queues - edp_data_queue_.Swap(); - eprosima::fastdds::rtps::CacheChange_t* change; std::string topic_name; @@ -660,7 +654,7 @@ void DiscoveryDataBase::match_new_server_( } } // The resources needed for TCP new connections are created during the matching process when the - // DATA(p) is receieved by each server. + // DATA(p) is received by each server. // Create virtual endpoints create_virtual_endpoints_(participant_prefix); @@ -788,32 +782,71 @@ void DiscoveryDataBase::update_participant_from_change_( { fastdds::rtps::GUID_t change_guid = guid_from_change(ch); + assert(ch->kind == eprosima::fastdds::rtps::ALIVE); + + // If the change corresponds to a previously removed participant (which hasn't yet been removed from the map since + // the DATA(Up) is still unacked), update map with new data and behave as if it was a new participant. + // Remove also the old change from the disposals collection, if it was added just before + if (participant_info.change()->kind != eprosima::fastdds::rtps::ALIVE) + { + // Update the change data + participant_info.participant_change_data(change_data); + + // Remove old change from disposals if it was added just before to avoid sending data UP + auto it = std::find(disposals_.begin(), disposals_.end(), participant_info.change()); + if (it != disposals_.end()) + { + disposals_.erase(it); + } + + // Update change. This should add the UNALIVE change to changes_to_release_, which should later both remove the + // change from the writer's history and release the change + update_change_and_unmatch_(ch, participant_info); + + // If it is local and server we have to create virtual endpoints, except for our own server + if (change_guid.guidPrefix != server_guid_prefix_ && !change_data.is_client() && change_data.is_local()) + { + // Match new server and create virtual endpoints + // NOTE: match after having updated the change, so virtual endpoints are not discarded for having + // an associated unalive participant + match_new_server_(change_guid.guidPrefix, change_data.is_superclient()); + } + + // Treat as a new participant found + new_updates_++; + if (change_guid.guidPrefix != server_guid_prefix_) + { + server_acked_by_all(false); + } + } + // Specific case when a Data(P) from an entity A known as remote comes from the very entity A (we have // the Data(P) because of other server B, but now it arrives from A itself) // The entity A changes to local // Must be local data, or else it is a remote endpoint and should not be changed - if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local() && + else if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local() && DiscoveryDataBase::participant_data_has_changed_(participant_info, change_data)) { + // Update the change data + participant_info.participant_change_data(change_data); + + // Update change + update_change_and_unmatch_(ch, participant_info); + // If the participant changes to server local, virtual endpoints must be added // If it is local and server the only possibility is it was a remote server and it must be converted to local if (!change_data.is_client()) { + // NOTE: match after having updated the change in order to send the new Data(P) match_new_server_(change_guid.guidPrefix, change_data.is_superclient()); } - // Update the change data - participant_info.participant_change_data(change_data); - - // Update change - update_change_and_unmatch_(ch, participant_info); - // Treat as a new participant found new_updates_++; server_acked_by_all(false); // It is possible that this Data(P) is in our history if it has not been acked by all - // In this case we have to resent it with the new update + // In this case we have to resend it with the new update if (!participant_info.is_acked_by_all()) { add_pdp_to_send_(ch); @@ -916,6 +949,29 @@ void DiscoveryDataBase::create_writers_from_change_( // The writer was NOT known by the database else { + // Check if corresponding participant is known, abort otherwise + // NOTE: Processing a DATA(w) should always be preceded by the reception and processing of its corresponding + // participant. However, one may receive a DATA(w) just after the participant has been removed, case in which the + // former should no longer be processed. + std::map::iterator writer_part_it = + participants_.find(writer_guid.guidPrefix); + if (writer_part_it == participants_.end()) + { + EPROSIMA_LOG_ERROR(DISCOVERY_DATABASE, + "Writer " << writer_guid << " has no associated participant. Skipping"); + assert(topic_name != virtual_topic_); + changes_to_release_.push_back(ch); // Release change so it can be reused + return; + } + else if (writer_part_it->second.change()->kind != fastdds::rtps::ChangeKind_t::ALIVE) + { + EPROSIMA_LOG_WARNING(DISCOVERY_DATABASE, + "Writer " << writer_guid << " is associated to a removed participant. Skipping"); + assert(topic_name != virtual_topic_); + changes_to_release_.push_back(ch); // Release change so it can be reused + return; + } + // Add entry to writers_ DiscoveryEndpointInfo tmp_writer( ch, @@ -936,18 +992,7 @@ void DiscoveryDataBase::create_writers_from_change_( new_updates_++; // Add entry to participants_[guid_prefix]::writers - std::map::iterator writer_part_it = - participants_.find(writer_guid.guidPrefix); - if (writer_part_it != participants_.end()) - { - writer_part_it->second.add_writer(writer_guid); - } - else - { - EPROSIMA_LOG_ERROR(DISCOVERY_DATABASE, - "Writer " << writer_guid << " has no associated participant. Skipping"); - return; - } + writer_part_it->second.add_writer(writer_guid); // Add writer to writers_by_topic_[topic_name] add_writer_to_topic_(writer_guid, topic_name); @@ -1034,6 +1079,29 @@ void DiscoveryDataBase::create_readers_from_change_( // The reader was NOT known by the database else { + // Check if corresponding participant is known, abort otherwise + // NOTE: Processing a DATA(r) should always be preceded by the reception and processing of its corresponding + // participant. However, one may receive a DATA(r) just after the participant has been removed, case in which the + // former should no longer be processed. + std::map::iterator reader_part_it = + participants_.find(reader_guid.guidPrefix); + if (reader_part_it == participants_.end()) + { + EPROSIMA_LOG_ERROR(DISCOVERY_DATABASE, + "Reader " << reader_guid << " has no associated participant. Skipping"); + assert(topic_name != virtual_topic_); + changes_to_release_.push_back(ch); // Release change so it can be reused + return; + } + else if (reader_part_it->second.change()->kind != fastdds::rtps::ChangeKind_t::ALIVE) + { + EPROSIMA_LOG_WARNING(DISCOVERY_DATABASE, + "Reader " << reader_guid << " is associated to a removed participant. Skipping"); + assert(topic_name != virtual_topic_); + changes_to_release_.push_back(ch); // Release change so it can be reused + return; + } + // Add entry to readers_ DiscoveryEndpointInfo tmp_reader( ch, @@ -1054,18 +1122,7 @@ void DiscoveryDataBase::create_readers_from_change_( new_updates_++; // Add entry to participants_[guid_prefix]::readers - std::map::iterator reader_part_it = - participants_.find(reader_guid.guidPrefix); - if (reader_part_it != participants_.end()) - { - reader_part_it->second.add_reader(reader_guid); - } - else - { - EPROSIMA_LOG_ERROR(DISCOVERY_DATABASE, - "Reader " << reader_guid << " has no associated participant. Skipping"); - return; - } + reader_part_it->second.add_reader(reader_guid); // Add reader to readers_by_topic_[topic_name] add_reader_to_topic_(reader_guid, topic_name); @@ -1347,7 +1404,7 @@ void DiscoveryDataBase::process_dispose_participant_( delete_reader_entity_(reader_guid); } - // All participant endoints must be already unmatched in others endopoints relevant_ack maps + // All participant endpoints must be already unmatched in others endpoints relevant_ack maps // Unmatch own participant unmatch_participant_(participant_guid.guidPrefix); @@ -1618,6 +1675,14 @@ bool DiscoveryDataBase::data_queue_empty() return (pdp_data_queue_.BothEmpty() && edp_data_queue_.BothEmpty()); } +void DiscoveryDataBase::swap_data_queues() +{ + // Swap EDP before PDP to avoid race condition in which both data P and w/r are received at the same time, + // just after having swapped the PDP queue + edp_data_queue_.Swap(); + pdp_data_queue_.Swap(); +} + bool DiscoveryDataBase::is_participant( const eprosima::fastdds::rtps::GUID_t& guid) { diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp index 21fbea83888..ceeb1051cf3 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp @@ -294,6 +294,9 @@ class DiscoveryDataBase // Check if the data queue is empty bool data_queue_empty(); + // Swap both EDP and PDP data queues + void swap_data_queues(); + void to_json( nlohmann::json& j) const; diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index 7b80ffb0d33..282d4cf768d 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -1080,6 +1080,9 @@ bool PDPServer::remove_remote_participant( bool PDPServer::process_data_queues() { EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "process_data_queues start"); + // Swap both as a first step in order to avoid the following race condition: reception of data w/r while processing + // the PDP queue, not having processed yet the corresponding data P (also received while processing the queue). + discovery_db_.swap_data_queues(); discovery_db_.process_pdp_data_queue(); return discovery_db_.process_edp_data_queue(); } diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index d843790f7ca..f3f1eeb7b11 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -1927,6 +1927,8 @@ TEST_P(Discovery, discovery_server_pdp_messages_sent) EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 3u); } +// This test checks that a Discover Server does not send duplicated EDP messages when its routine +// is triggered by EDP Listeners while it waits for ACKs TEST_P(Discovery, discovery_server_edp_messages_sent) { // Skip test in intraprocess and datasharing mode @@ -2106,3 +2108,154 @@ TEST_P(Discovery, discovery_server_edp_messages_sent) EXPECT_EQ(num_data_r_w_sends_s1.load(std::memory_order::memory_order_seq_cst), 2u); EXPECT_EQ(num_data_r_w_sends_s2.load(std::memory_order::memory_order_seq_cst), 2u); } + +// This is a regression test for Redmine #23088, which corrects the following data race in the discovery server: +// a) When a participant is being removed (either because it is being deleted or because its lease duration has expired), +// it is not deleted from participants_ map until the server has received all ACKs from clients of the Data(Up). +// b) If the same participant is re-discovered (Data(p) received) while the server is waiting for the Data(Up) ACKs, +// it will not be updated in the participants_ map as ALIVE. +// c) If the Data(Up) ACKs and the Data(r/w) of the rediscovered participant are received at the same time, +// the server will delete the participant and try to process the Data(r/w) messages in the same routine. +// The server will try to register a reader/writer from a deleted participant, which will result in the error: +// "Matching unexisting participant from reader/writer". +// This test checks that this does not happen after fixing point b) and updating the participant to ALIVE. +TEST_P(Discovery, discovery_server_rediscover_participant_being_removed) +{ + // Skip test in intraprocess and datasharing mode + if (TRANSPORT != GetParam()) + { + GTEST_SKIP() << "Only makes sense on TRANSPORT"; + return; + } + + using namespace eprosima::fastdds::dds; + + // One discovery server will be created, with two direct clients connected to it. + // Client 1 will be removed and then relaunched, while the Client 2 will be kept alive. + // Client 2 ACKs will be blocked to simulate a slow discovery process. + + std::atomic filter_activated { false }; + auto block_data_up_acks = [&filter_activated](CDRMessage_t& msg) + { + // Filter Data(Up) ACKs messages + if (filter_activated.load(std::memory_order::memory_order_seq_cst)) + { + // Go back to submsgkind + auto submsgkind_pos = msg.pos - 4; + auto acknack_submsg = eprosima::fastdds::helpers::cdr_parse_acknack_submsg( + (char*)&msg.buffer[submsgkind_pos], + msg.length - submsgkind_pos); + + assert(acknack_submsg.submsgHeader().submessageId() == ACKNACK); + + if (eprosima::fastdds::rtps::c_EntityId_SPDPWriter == + *reinterpret_cast(&acknack_submsg.writerId()) || + eprosima::fastdds::rtps::c_EntityId_SEDPPubWriter == + *reinterpret_cast(&acknack_submsg.writerId())) + { + std::cout << "Blocking Data(Up) ACKs" << std::endl; + return true; + } + } + return false; + }; + + // Declare a test transport that will count the number of Data(p) messages sent + auto default_udp_transport = std::make_shared(); + auto test_transport = std::make_shared(); + test_transport->drop_ack_nack_messages_filter_ = [&](CDRMessage_t& msg) + { + return block_data_up_acks(msg); + }; + + // Create the main participant + auto server = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server; // UDPv4 locator by default + eprosima::fastdds::rtps::IPLocator::setIPv4(locator_server, 127, 0, 0, 1); + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server, global_port); + + WireProtocolConfigQos server_wp_qos; + server_wp_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + server_wp_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server); + // Raise the routine period to control the discovery process + server_wp_qos.builtin.discovery_config.discoveryServer_client_syncperiod = { 5, 0 }; + server_wp_qos.builtin.discovery_config.leaseDuration = { 60, 0 }; + server_wp_qos.builtin.discovery_config.leaseDuration_announcementperiod = { 59, 0 }; + server_wp_qos.builtin.discovery_config.initial_announcements.count = 1; + + // The main participant will use the test transport and a specific announcements configuration + server->disable_builtin_transport() + .add_user_transport_to_pparams(default_udp_transport) + .wire_protocol(server_wp_qos); + + // Start the main participant + ASSERT_TRUE(server->init_participant()); + + // Create clients + std::shared_ptr> client_1 = + std::make_shared>(1u, 0u, 0u, 1u); + std::shared_ptr> client_2 = + std::make_shared>(0u, 1u, 1u, 0u); + // Set participant as client + WireProtocolConfigQos client_qos; + client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server); + client_qos.builtin.discovery_config.leaseDuration = { 60, 0 }; + client_qos.builtin.discovery_config.leaseDuration_announcementperiod = { 59, 0 }; + client_qos.builtin.discovery_config.initial_announcements.count = 1; + std::istringstream("64.61.74.61.5f.72.61.63.65.5f.64.73") >> client_qos.prefix; + + // Init client 1 + client_1->wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .pub_topic_name(TEST_TOPIC_NAME); + ASSERT_TRUE(client_1->init_participant()); + ASSERT_TRUE(client_1->init_publisher(0u)); + + // Init client 2 + WireProtocolConfigQos client2_qos = client_qos; + std::istringstream("73.61.74.61.5f.72.61.63.65.5f.64.73") >> client2_qos.prefix; + client_2->wire_protocol(client2_qos) + .disable_builtin_transport() + .add_user_transport_to_pparams(test_transport) + .sub_topic_name(TEST_TOPIC_NAME); + ASSERT_TRUE(client_2->init_participant()); + ASSERT_TRUE(client_2->init_subscriber(0u)); + + // Wait at least the servers routine period to discover endpoints + ASSERT_TRUE(server->wait_discovery(std::chrono::seconds(6), 2, true)); + client_2->sub_wait_discovery(1); + client_1->pub_wait_discovery(1); + + // Server discovered both clients, activate filter to block Data(Up) ACKs + std::this_thread::sleep_for(std::chrono::seconds(1)); + std::cout << "Blocking Data(Up) ACKs activated" << std::endl; + filter_activated.store(true, std::memory_order::memory_order_seq_cst); + + // Remove client 1 + client_1.reset(); + ASSERT_TRUE(client_1 == nullptr); + // Ensure client 2 has unmatched the client 1 + client_2->sub_wait_discovery(0); + std::this_thread::sleep_for(std::chrono::seconds(3)); + + // Relaunch client 1 and deactivate the filter + client_1 = std::make_shared>(1u, 0u, 0u, 1u); + client_1->wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .pub_topic_name(TEST_TOPIC_NAME); + // Init only the participant, publisher will be initialized later + ASSERT_TRUE(client_1->init_participant()); + std::cout << "Blocking Data(Up) ACKs deactivated" << std::endl; + filter_activated.store(false, std::memory_order::memory_order_seq_cst); + + // Give time to receive Data(Up) ACK, process the Data(p) and update the participant, + // but do it before a whole period of the server's routine. + // In this way new Data(w) and Data(Up) ACK are processed in the same routine + std::this_thread::sleep_for(std::chrono::seconds(1)); + ASSERT_TRUE(client_1->init_publisher(0u)); + + // Client 2 should discover client 1 again and log error "Writer has no associated participant." should not appear + client_2->sub_wait_discovery(1); +}