From 0ed5db0ca2b4ac61abe580ec677fe7438bb35c25 Mon Sep 17 00:00:00 2001 From: juanlofer-eprosima <88179026+juanlofer-eprosima@users.noreply.github.com> Date: Thu, 8 May 2025 07:32:16 +0200 Subject: [PATCH] Solve Discovery Server race conditions (#5780) * Refs #23088: Test reconnection when removing participant Signed-off-by: cferreiragonz * Refs #23088: Solve EDP-PDP queues race condition Signed-off-by: Juan Lopez Fernandez * Refs #23088: Solve data UP + data P race condition Signed-off-by: Juan Lopez Fernandez * Refs #23088: Abort writer/reader processing if associated participant not alive Signed-off-by: Juan Lopez Fernandez * Refs #23088: Apply suggestions Signed-off-by: Juan Lopez Fernandez * Refs #23088: Release change when writer/reader insertion in DB failed Signed-off-by: Juan Lopez Fernandez * Refs #23088: Match servers after change update Signed-off-by: Juan Lopez Fernandez --------- Signed-off-by: cferreiragonz Signed-off-by: Juan Lopez Fernandez Co-authored-by: cferreiragonz (cherry picked from commit ec666f7276d9afe8f303b44f70cf397746ebbdf4) # Conflicts: # src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp # test/blackbox/common/BlackboxTestsDiscovery.cpp --- .../discovery/database/DiscoveryDataBase.cpp | 170 ++++- .../discovery/database/DiscoveryDataBase.hpp | 3 + .../discovery/participant/PDPServer.cpp | 3 + .../common/BlackboxTestsDiscovery.cpp | 627 ++++++++++++++++++ 4 files changed, 797 insertions(+), 6 deletions(-) diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp index c6fa1c574a0..e85cef67e18 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp @@ -479,9 +479,6 @@ void DiscoveryDataBase::process_pdp_data_queue() // Lock(exclusive mode) mutex locally std::lock_guard guard(mutex_); - // Swap DATA queues - pdp_data_queue_.Swap(); - // Process all messages in the queque while (!pdp_data_queue_.Empty()) { @@ -519,10 +516,14 @@ bool DiscoveryDataBase::process_edp_data_queue() // Lock(exclusive mode) mutex locally std::lock_guard guard(mutex_); +<<<<<<< HEAD // Swap DATA queues edp_data_queue_.Swap(); eprosima::fastrtps::rtps::CacheChange_t* change; +======= + eprosima::fastdds::rtps::CacheChange_t* change; +>>>>>>> ec666f72 (Solve Discovery Server race conditions (#5780)) std::string topic_name; // Process all messages in the queque @@ -603,6 +604,52 @@ void DiscoveryDataBase::match_new_server_( assert(our_data_it != participants_.end()); add_pdp_to_send_(our_data_it->second.change()); +<<<<<<< HEAD +======= + if (!is_superclient) + { + // To obtain a mesh topology with servers, we need to: + // - Make all known servers relevant to the new server + // - Make the new server relevant to all known servers + // - Send DATA(p) of all known servers to the new server + // - Send Data(p) of the new server to all other servers + for (auto& part : participants_) + { + if (part.first != server_guid_prefix_ && !part.second.is_client() && !part.second.is_superclient()) + { + if (part.first == participant_prefix) + { + std::lock_guard guard(mutex_); + bool resend_new_pdp = false; + for (auto& server: servers_) + { + if (server != participant_prefix) + { + // Make all known servers relevant to the new server, but not matched + part.second.add_or_update_ack_participant(server, ParticipantState::PENDING_SEND); + resend_new_pdp = true; + } + } + if (resend_new_pdp) + { + // Send DATA(p) of the new server to all other servers. + add_pdp_to_send_(part.second.change()); + } + } + else + { + // Make the new server relevant to all known servers + part.second.add_or_update_ack_participant(participant_prefix, ParticipantState::PENDING_SEND); + // Send DATA(p) of all known servers to the new participant + add_pdp_to_send_(part.second.change()); + } + } + } + } + // The resources needed for TCP new connections are created during the matching process when the + // DATA(p) is received by each server. + +>>>>>>> ec666f72 (Solve Discovery Server race conditions (#5780)) // Create virtual endpoints create_virtual_endpoints_(participant_prefix); } @@ -728,13 +775,52 @@ void DiscoveryDataBase::update_participant_from_change_( { fastrtps::rtps::GUID_t change_guid = guid_from_change(ch); + assert(ch->kind == eprosima::fastdds::rtps::ALIVE); + + // If the change corresponds to a previously removed participant (which hasn't yet been removed from the map since + // the DATA(Up) is still unacked), update map with new data and behave as if it was a new participant. + // Remove also the old change from the disposals collection, if it was added just before + if (participant_info.change()->kind != eprosima::fastdds::rtps::ALIVE) + { + // Update the change data + participant_info.participant_change_data(change_data); + + // Remove old change from disposals if it was added just before to avoid sending data UP + auto it = std::find(disposals_.begin(), disposals_.end(), participant_info.change()); + if (it != disposals_.end()) + { + disposals_.erase(it); + } + + // Update change. This should add the UNALIVE change to changes_to_release_, which should later both remove the + // change from the writer's history and release the change + update_change_and_unmatch_(ch, participant_info); + + // If it is local and server we have to create virtual endpoints, except for our own server + if (change_guid.guidPrefix != server_guid_prefix_ && !change_data.is_client() && change_data.is_local()) + { + // Match new server and create virtual endpoints + // NOTE: match after having updated the change, so virtual endpoints are not discarded for having + // an associated unalive participant + match_new_server_(change_guid.guidPrefix, change_data.is_superclient()); + } + + // Treat as a new participant found + new_updates_++; + if (change_guid.guidPrefix != server_guid_prefix_) + { + server_acked_by_all(false); + } + } + // Specific case when a Data(P) from an entity A known as remote comes from the very entity A (we have // the Data(P) because of other server B, but now it arrives from A itself) // The entity A changes to local // Must be local data, or else it is a remote endpoint and should not be changed - if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local() && + else if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local() && DiscoveryDataBase::participant_data_has_changed_(participant_info, change_data)) { +<<<<<<< HEAD // If the participant changes to server local, virtual endpoints must be added // If it is local and server the only possibility is it was a remote server and it must be converted to local if (!change_data.is_client()) @@ -742,18 +828,28 @@ void DiscoveryDataBase::update_participant_from_change_( match_new_server_(change_guid.guidPrefix); } +======= +>>>>>>> ec666f72 (Solve Discovery Server race conditions (#5780)) // Update the change data participant_info.participant_change_data(change_data); // Update change update_change_and_unmatch_(ch, participant_info); + // If the participant changes to server local, virtual endpoints must be added + // If it is local and server the only possibility is it was a remote server and it must be converted to local + if (!change_data.is_client()) + { + // NOTE: match after having updated the change in order to send the new Data(P) + match_new_server_(change_guid.guidPrefix, change_data.is_superclient()); + } + // Treat as a new participant found new_updates_++; server_acked_by_all(false); // It is possible that this Data(P) is in our history if it has not been acked by all - // In this case we have to resent it with the new update + // In this case we have to resend it with the new update if (!participant_info.is_acked_by_all()) { add_pdp_to_send_(ch); @@ -856,6 +952,29 @@ void DiscoveryDataBase::create_writers_from_change_( // The writer was NOT known by the database else { + // Check if corresponding participant is known, abort otherwise + // NOTE: Processing a DATA(w) should always be preceded by the reception and processing of its corresponding + // participant. However, one may receive a DATA(w) just after the participant has been removed, case in which the + // former should no longer be processed. + std::map::iterator writer_part_it = + participants_.find(writer_guid.guidPrefix); + if (writer_part_it == participants_.end()) + { + EPROSIMA_LOG_ERROR(DISCOVERY_DATABASE, + "Writer " << writer_guid << " has no associated participant. Skipping"); + assert(topic_name != virtual_topic_); + changes_to_release_.push_back(ch); // Release change so it can be reused + return; + } + else if (writer_part_it->second.change()->kind != fastdds::rtps::ChangeKind_t::ALIVE) + { + EPROSIMA_LOG_WARNING(DISCOVERY_DATABASE, + "Writer " << writer_guid << " is associated to a removed participant. Skipping"); + assert(topic_name != virtual_topic_); + changes_to_release_.push_back(ch); // Release change so it can be reused + return; + } + // Add entry to writers_ DiscoveryEndpointInfo tmp_writer( ch, @@ -876,6 +995,7 @@ void DiscoveryDataBase::create_writers_from_change_( new_updates_++; // Add entry to participants_[guid_prefix]::writers +<<<<<<< HEAD std::map::iterator writer_part_it = participants_.find(writer_guid.guidPrefix); if (writer_part_it != participants_.end()) @@ -888,6 +1008,9 @@ void DiscoveryDataBase::create_writers_from_change_( "Writer " << writer_guid << " has no associated participant. Skipping"); return; } +======= + writer_part_it->second.add_writer(writer_guid); +>>>>>>> ec666f72 (Solve Discovery Server race conditions (#5780)) // Add writer to writers_by_topic_[topic_name] add_writer_to_topic_(writer_guid, topic_name); @@ -974,6 +1097,29 @@ void DiscoveryDataBase::create_readers_from_change_( // The reader was NOT known by the database else { + // Check if corresponding participant is known, abort otherwise + // NOTE: Processing a DATA(r) should always be preceded by the reception and processing of its corresponding + // participant. However, one may receive a DATA(r) just after the participant has been removed, case in which the + // former should no longer be processed. + std::map::iterator reader_part_it = + participants_.find(reader_guid.guidPrefix); + if (reader_part_it == participants_.end()) + { + EPROSIMA_LOG_ERROR(DISCOVERY_DATABASE, + "Reader " << reader_guid << " has no associated participant. Skipping"); + assert(topic_name != virtual_topic_); + changes_to_release_.push_back(ch); // Release change so it can be reused + return; + } + else if (reader_part_it->second.change()->kind != fastdds::rtps::ChangeKind_t::ALIVE) + { + EPROSIMA_LOG_WARNING(DISCOVERY_DATABASE, + "Reader " << reader_guid << " is associated to a removed participant. Skipping"); + assert(topic_name != virtual_topic_); + changes_to_release_.push_back(ch); // Release change so it can be reused + return; + } + // Add entry to readers_ DiscoveryEndpointInfo tmp_reader( ch, @@ -994,6 +1140,7 @@ void DiscoveryDataBase::create_readers_from_change_( new_updates_++; // Add entry to participants_[guid_prefix]::readers +<<<<<<< HEAD std::map::iterator reader_part_it = participants_.find(reader_guid.guidPrefix); if (reader_part_it != participants_.end()) @@ -1006,6 +1153,9 @@ void DiscoveryDataBase::create_readers_from_change_( "Reader " << reader_guid << " has no associated participant. Skipping"); return; } +======= + reader_part_it->second.add_reader(reader_guid); +>>>>>>> ec666f72 (Solve Discovery Server race conditions (#5780)) // Add reader to readers_by_topic_[topic_name] add_reader_to_topic_(reader_guid, topic_name); @@ -1285,7 +1435,7 @@ void DiscoveryDataBase::process_dispose_participant_( delete_reader_entity_(reader_guid); } - // All participant endoints must be already unmatched in others endopoints relevant_ack maps + // All participant endpoints must be already unmatched in others endpoints relevant_ack maps // Unmatch own participant unmatch_participant_(participant_guid.guidPrefix); @@ -1542,6 +1692,14 @@ bool DiscoveryDataBase::data_queue_empty() return (pdp_data_queue_.BothEmpty() && edp_data_queue_.BothEmpty()); } +void DiscoveryDataBase::swap_data_queues() +{ + // Swap EDP before PDP to avoid race condition in which both data P and w/r are received at the same time, + // just after having swapped the PDP queue + edp_data_queue_.Swap(); + pdp_data_queue_.Swap(); +} + bool DiscoveryDataBase::is_participant( const eprosima::fastrtps::rtps::GUID_t& guid) { diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp index f5ca2daf801..fb16a3d6b63 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp @@ -300,6 +300,9 @@ class DiscoveryDataBase // Check if the data queue is empty bool data_queue_empty(); + // Swap both EDP and PDP data queues + void swap_data_queues(); + void to_json( nlohmann::json& j) const; diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index f23e31467f8..c04a680b3dd 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -1098,6 +1098,9 @@ bool PDPServer::remove_remote_participant( bool PDPServer::process_data_queues() { EPROSIMA_LOG_INFO(RTPS_PDP_SERVER, "process_data_queues start"); + // Swap both as a first step in order to avoid the following race condition: reception of data w/r while processing + // the PDP queue, not having processed yet the corresponding data P (also received while processing the queue). + discovery_db_.swap_data_queues(); discovery_db_.process_pdp_data_queue(); return discovery_db_.process_edp_data_queue(); } diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index aaa0395ce69..5290e814df9 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -2068,3 +2068,630 @@ TEST(Discovery, MulticastInitialPeer) writer.wait_discovery(); reader.wait_discovery(); } +<<<<<<< HEAD +======= + +//! Regression test for redmine issue 17162 +TEST(Discovery, MultipleXMLProfileLoad) +{ + // These test may fail because one of the participants disappear before the other has found it. + // Thus, use condition variable so threads only finish once the discovery has taken place. + std::condition_variable cv; + std::mutex cv_mtx; + std::atomic n_discoveries(0); + + auto participant_creation_reader = [&]() + { + PubSubReader participant(TEST_TOPIC_NAME); + participant.init(); + participant.wait_discovery(); + + // Notify discovery has happen + { + std::unique_lock lock(cv_mtx); + n_discoveries++; + } + cv.notify_all(); + + std::unique_lock lock(cv_mtx); + cv.wait( + lock, + [&]() + { + return n_discoveries >= 2; + } + ); + }; + + auto participant_creation_writer = [&]() + { + PubSubWriter participant(TEST_TOPIC_NAME); + participant.init(); + participant.wait_discovery(); + + // Notify discovery has happen + { + std::unique_lock lock(cv_mtx); + n_discoveries++; + } + cv.notify_all(); + + std::unique_lock lock(cv_mtx); + cv.wait( + lock, + [&]() + { + return n_discoveries >= 2; + } + ); + }; + + // Start thread creating second participant + std::thread thr_reader(participant_creation_reader); + std::thread thr_writer(participant_creation_writer); + + thr_reader.join(); + thr_writer.join(); +} + +//! Regression test for redmine issue 20641 +TEST(Discovery, discovery_cyclone_participant_with_custom_pid) +{ + using namespace eprosima::fastdds::dds; + using namespace eprosima::fastdds::rtps; + + /* Custom participant listener to count number of discovered participants */ + class DiscoveryListener : public DomainParticipantListener + { + public: + + void on_participant_discovery( + DomainParticipant*, + ParticipantDiscoveryStatus status, + const ParticipantBuiltinTopicData& /*info*/, + bool& should_be_ignored) override + { + should_be_ignored = false; + if (ParticipantDiscoveryStatus::DISCOVERED_PARTICIPANT == status) + { + discovered_participants_++; + } + else if (ParticipantDiscoveryStatus::REMOVED_PARTICIPANT == status) + { + discovered_participants_--; + } + } + + uint8_t discovered_participants() const + { + return discovered_participants_; + } + + private: + + using DomainParticipantListener::on_participant_discovery; + + std::atomic discovered_participants_{0}; + }; + + /* Create a datagram injection transport */ + using eprosima::fastdds::rtps::DatagramInjectionTransportDescriptor; + using eprosima::fastdds::rtps::DatagramInjectionTransport; + auto low_level_transport = std::make_shared(); + auto transport = std::make_shared(low_level_transport); + + /* Disable builtin transport and add custom one */ + DomainParticipantQos participant_qos = PARTICIPANT_QOS_DEFAULT; + participant_qos.transport().use_builtin_transports = false; + participant_qos.transport().user_transports.clear(); + participant_qos.transport().user_transports.push_back(transport); + + /* Create participant with custom transport and listener */ + DiscoveryListener listener; + /* We need to match the domain id in the datagram */ + uint32_t domain_id = 0; + DomainParticipantFactory* factory = DomainParticipantFactory::get_instance(); + DomainParticipant* participant = factory->create_participant(domain_id, participant_qos, &listener); + ASSERT_NE(nullptr, participant); + + /* Inject a Cyclone DDS Data(p) with a custom PID that we also use */ + auto receivers = transport->get_receivers(); + ASSERT_FALSE(receivers.empty()); + DatagramInjectionTransport::deliver_datagram_from_file(receivers, "datagrams/20641.bin"); + + /* Assert that the participant is discovered */ + ASSERT_EQ(listener.discovered_participants(), 1u); + + /* Clean up */ + factory->delete_participant(participant); +} + +// This test checks that a Discover Server does not send duplicated PDP messages of itself when new clients +// are discovered +TEST_P(Discovery, discovery_server_pdp_messages_sent) +{ + // Skip test in intraprocess and datasharing mode + if (TRANSPORT != GetParam()) + { + GTEST_SKIP() << "Only makes sense on TRANSPORT"; + return; + } + + using namespace eprosima::fastdds::dds; + + // One discovery server will be created, with multiple direct clients connected to it. + // Initial announcements will be disabled and lease announcements will be configured to control discovery sequence. + // The main participant will use the test transport to count the number of Data(p) sent. + + // Look for the PID_DOMAIN_ID in the message as it is only present in Data(p) messages + auto builtin_msg_is_data_p = [](CDRMessage_t& msg, std::atomic& num_data_p) + { + uint32_t qos_size = 0; + uint32_t original_pos = msg.pos; + bool is_sentinel = false; + bool inline_qos_msg = false; + + while (!is_sentinel) + { + msg.pos = original_pos + qos_size; + + uint16_t pid = eprosima::fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + uint16_t plength = eprosima::fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + bool valid = true; + + // If inline_qos submessage is found we will have an additional Sentinel + if (pid == eprosima::fastdds::dds::PID_RELATED_SAMPLE_IDENTITY) + { + inline_qos_msg = true; + } + else if (pid == eprosima::fastdds::dds::PID_SENTINEL) + { + // PID_SENTINEL is always considered of length 0 + plength = 0; + if (!inline_qos_msg) + { + // If the PID is not inline qos, then we need to set the sentinel + // to true, as it is the last PID + is_sentinel = true; + } + } + + qos_size += (4 + plength); + + // Align to 4 byte boundary and prepare for next iteration + qos_size = (qos_size + 3) & ~3; + + if (!valid || ((msg.pos + plength) > msg.length)) + { + return false; + } + else if (!is_sentinel) + { + if (pid == eprosima::fastdds::dds::PID_DOMAIN_ID) + { + std::cout << "Data(p) sent by the server" << std::endl; + inline_qos_msg = false; + num_data_p.fetch_add(1u, std::memory_order_seq_cst); + break; + } + } + } + + // Do not drop the packet in any case + return false; + }; + + // Declare a test transport that will count the number of Data(p) messages sent + std::atomic num_data_p_sends{ 0 }; + auto test_transport = std::make_shared(); + test_transport->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) + { + return builtin_msg_is_data_p(msg, num_data_p_sends); + }; + + // Create the main participant + auto server = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server; // UDPv4 locator by default + eprosima::fastdds::rtps::IPLocator::setIPv4(locator_server, 127, 0, 0, 1); + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server, global_port); + + WireProtocolConfigQos server_wp_qos; + server_wp_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + server_wp_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server); + + server_wp_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; + server_wp_qos.builtin.discovery_config.leaseDuration_announcementperiod = c_TimeInfinite; + server_wp_qos.builtin.discovery_config.initial_announcements.count = 0; + + // The main participant will use the test transport and a specific announcements configuration + server->disable_builtin_transport().add_user_transport_to_pparams(test_transport) + .wire_protocol(server_wp_qos); + + // Start the main participant + ASSERT_TRUE(server->init_participant()); + + // Create a client that connects to the first server + PubSubParticipant client_1(0u, 0u, 0u, 0u); + PubSubParticipant client_2(0u, 0u, 0u, 0u); + PubSubParticipant client_3(0u, 0u, 0u, 0u); + // Set participant as client + WireProtocolConfigQos client_qos; + client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server); + client_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; + client_qos.builtin.discovery_config.leaseDuration_announcementperiod = c_TimeInfinite; + client_qos.builtin.discovery_config.initial_announcements.count = 1; + // Init client 1 + ASSERT_TRUE(client_1.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 1, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 1u); + + // Init client 2 + ASSERT_TRUE(client_2.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 2, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 2u); + + // Init client 3 + ASSERT_TRUE(client_3.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 3, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 3u); +} + +// This test checks that a Discover Server does not send duplicated EDP messages when its routine +// is triggered by EDP Listeners while it waits for ACKs +TEST_P(Discovery, discovery_server_edp_messages_sent) +{ + // Skip test in intraprocess and datasharing mode + if (TRANSPORT != GetParam()) + { + GTEST_SKIP() << "Only makes sense on TRANSPORT"; + return; + } + + using namespace eprosima::fastdds::dds; + + // Two discovery servers will be created, each with a direct client connected to them. + // Initial announcements will be disabled and lease announcements will be configured to control discovery sequence. + // The main participant will use the test transport to count the number of Data(r/w) sent. + + // Look for the PID_ENDPOINT_GUID in the message as it is only present in Data(r/w) messages + auto builtin_msg_is_data_r_w = [](CDRMessage_t& msg, std::atomic& num_data_r_w) + { + uint32_t qos_size = 0; + uint32_t original_pos = msg.pos; + bool is_sentinel = false; + bool inline_qos_msg = false; + + while (!is_sentinel) + { + msg.pos = original_pos + qos_size; + + uint16_t pid = eprosima::fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + uint16_t plength = eprosima::fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + bool valid = true; + + if (pid == eprosima::fastdds::dds::PID_RELATED_SAMPLE_IDENTITY) + { + inline_qos_msg = true; + } + else if (pid == eprosima::fastdds::dds::PID_SENTINEL) + { + // PID_SENTINEL is always considered of length 0 + plength = 0; + if (!inline_qos_msg) + { + // If the PID is not inline qos, then we need to set the sentinel + // to true, as it is the last PID + is_sentinel = true; + } + } + + qos_size += (4 + plength); + + // Align to 4 byte boundary and prepare for next iteration + qos_size = (qos_size + 3) & ~3; + + if (!valid || ((msg.pos + plength) > msg.length)) + { + return false; + } + else if (!is_sentinel) + { + if (pid == eprosima::fastdds::dds::PID_ENDPOINT_GUID) + { + std::cout << "Data (r/w) sent by the server" << std::endl; + num_data_r_w.fetch_add(1u, std::memory_order_seq_cst); + break; + } + else if (pid == eprosima::fastdds::dds::PID_VENDORID) + { + // Vendor ID is present in both Data(p) and Data(r/w) messages + inline_qos_msg = false; + } + } + } + + // Do not drop the packet in any case + return false; + }; + + // Declare a test transport that will count the number of Data(r/w) messages sent + std::atomic num_data_r_w_sends_s1{ 0 }; + std::atomic num_data_r_w_sends_s2{ 0 }; + auto test_transport_s1 = std::make_shared(); + test_transport_s1->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) + { + return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s1); + }; + + auto test_transport_s2 = std::make_shared(); + test_transport_s2->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) + { + return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s2); + }; + + // Create server 1 + auto server_1 = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server_1; // UDPv4 locator by default + eprosima::fastdds::rtps::IPLocator::setIPv4(locator_server_1, 127, 0, 0, 1); + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server_1, global_port); + + WireProtocolConfigQos server_wp_qos_1; + server_wp_qos_1.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + server_wp_qos_1.builtin.metatrafficUnicastLocatorList.push_back(locator_server_1); + + server_wp_qos_1.builtin.discovery_config.leaseDuration = c_TimeInfinite; + server_wp_qos_1.builtin.discovery_config.leaseDuration_announcementperiod = c_TimeInfinite; + server_wp_qos_1.builtin.discovery_config.initial_announcements.count = 0; + + // The main participant will use the test transport and a specific announcements configuration + server_1->disable_builtin_transport().add_user_transport_to_pparams(test_transport_s1) + .wire_protocol(server_wp_qos_1); + + // Start the main participant + ASSERT_TRUE(server_1->init_participant()); + + // Create server 2 + auto server_2 = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server_2 = locator_server_1; // UDPv4 locator by default + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server_2, global_port + 1); + + WireProtocolConfigQos server_wp_qos_2 = server_wp_qos_1; + server_wp_qos_2.builtin.metatrafficUnicastLocatorList.clear(); + server_wp_qos_2.builtin.metatrafficUnicastLocatorList.push_back(locator_server_2); + // Configure 1 initial announcement as this Server will connect to the first one + server_wp_qos_2.builtin.discovery_config.initial_announcements.count = 1; + server_wp_qos_2.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server_1); + + // The main participant will use the test transport and a specific announcements configuration + server_2->disable_builtin_transport().add_user_transport_to_pparams(test_transport_s2) + .wire_protocol(server_wp_qos_2); + + // Start the main participant + ASSERT_TRUE(server_2->init_participant()); + + // Both servers match + server_1->wait_discovery(std::chrono::seconds(5), 1, true); + server_2->wait_discovery(std::chrono::seconds(5), 1, true); + // Let some time for the server to run the internal routine and match virtual endpoints + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // Create a client that connects to their corresponding server + PubSubWriter client_1(TEST_TOPIC_NAME); + PubSubReader client_2(TEST_TOPIC_NAME); + // Set participant as client + WireProtocolConfigQos client_qos; + client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server_1); + client_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; + client_qos.builtin.discovery_config.leaseDuration_announcementperiod = { 15, 0 }; + client_qos.builtin.discovery_config.initial_announcements.count = 0; + + // Init client 1 + client_1.set_wire_protocol_qos(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init(); + + // Init client 2 + client_qos.builtin.discovery_config.m_DiscoveryServers.clear(); + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server_2); + client_2.set_wire_protocol_qos(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init(); + + ASSERT_TRUE(client_1.isInitialized()); + ASSERT_TRUE(client_2.isInitialized()); + + // Wait the lease announcement period to discover endpoints + server_1->wait_discovery(std::chrono::seconds(5), 2, true); + server_2->wait_discovery(std::chrono::seconds(5), 2, true); + + // Ensure that no additional Data(r/w) messages are sent by DS routine + std::this_thread::sleep_for(std::chrono::seconds(15)); + + EXPECT_EQ(num_data_r_w_sends_s1.load(std::memory_order::memory_order_seq_cst), 2u); + EXPECT_EQ(num_data_r_w_sends_s2.load(std::memory_order::memory_order_seq_cst), 2u); +} + +// This is a regression test for Redmine #23088, which corrects the following data race in the discovery server: +// a) When a participant is being removed (either because it is being deleted or because its lease duration has expired), +// it is not deleted from participants_ map until the server has received all ACKs from clients of the Data(Up). +// b) If the same participant is re-discovered (Data(p) received) while the server is waiting for the Data(Up) ACKs, +// it will not be updated in the participants_ map as ALIVE. +// c) If the Data(Up) ACKs and the Data(r/w) of the rediscovered participant are received at the same time, +// the server will delete the participant and try to process the Data(r/w) messages in the same routine. +// The server will try to register a reader/writer from a deleted participant, which will result in the error: +// "Matching unexisting participant from reader/writer". +// This test checks that this does not happen after fixing point b) and updating the participant to ALIVE. +TEST_P(Discovery, discovery_server_rediscover_participant_being_removed) +{ + // Skip test in intraprocess and datasharing mode + if (TRANSPORT != GetParam()) + { + GTEST_SKIP() << "Only makes sense on TRANSPORT"; + return; + } + + using namespace eprosima::fastdds::dds; + + // One discovery server will be created, with two direct clients connected to it. + // Client 1 will be removed and then relaunched, while the Client 2 will be kept alive. + // Client 2 ACKs will be blocked to simulate a slow discovery process. + + std::atomic filter_activated { false }; + auto block_data_up_acks = [&filter_activated](CDRMessage_t& msg) + { + // Filter Data(Up) ACKs messages + if (filter_activated.load(std::memory_order::memory_order_seq_cst)) + { + // Go back to submsgkind + auto submsgkind_pos = msg.pos - 4; + auto acknack_submsg = eprosima::fastdds::helpers::cdr_parse_acknack_submsg( + (char*)&msg.buffer[submsgkind_pos], + msg.length - submsgkind_pos); + + assert(acknack_submsg.submsgHeader().submessageId() == ACKNACK); + + if (eprosima::fastdds::rtps::c_EntityId_SPDPWriter == + *reinterpret_cast(&acknack_submsg.writerId()) || + eprosima::fastdds::rtps::c_EntityId_SEDPPubWriter == + *reinterpret_cast(&acknack_submsg.writerId())) + { + std::cout << "Blocking Data(Up) ACKs" << std::endl; + return true; + } + } + return false; + }; + + // Declare a test transport that will count the number of Data(p) messages sent + auto default_udp_transport = std::make_shared(); + auto test_transport = std::make_shared(); + test_transport->drop_ack_nack_messages_filter_ = [&](CDRMessage_t& msg) + { + return block_data_up_acks(msg); + }; + + // Create the main participant + auto server = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server; // UDPv4 locator by default + eprosima::fastdds::rtps::IPLocator::setIPv4(locator_server, 127, 0, 0, 1); + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server, global_port); + + WireProtocolConfigQos server_wp_qos; + server_wp_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + server_wp_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server); + // Raise the routine period to control the discovery process + server_wp_qos.builtin.discovery_config.discoveryServer_client_syncperiod = { 5, 0 }; + server_wp_qos.builtin.discovery_config.leaseDuration = { 60, 0 }; + server_wp_qos.builtin.discovery_config.leaseDuration_announcementperiod = { 59, 0 }; + server_wp_qos.builtin.discovery_config.initial_announcements.count = 1; + + // The main participant will use the test transport and a specific announcements configuration + server->disable_builtin_transport() + .add_user_transport_to_pparams(default_udp_transport) + .wire_protocol(server_wp_qos); + + // Start the main participant + ASSERT_TRUE(server->init_participant()); + + // Create clients + std::shared_ptr> client_1 = + std::make_shared>(1u, 0u, 0u, 1u); + std::shared_ptr> client_2 = + std::make_shared>(0u, 1u, 1u, 0u); + // Set participant as client + WireProtocolConfigQos client_qos; + client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server); + client_qos.builtin.discovery_config.leaseDuration = { 60, 0 }; + client_qos.builtin.discovery_config.leaseDuration_announcementperiod = { 59, 0 }; + client_qos.builtin.discovery_config.initial_announcements.count = 1; + std::istringstream("64.61.74.61.5f.72.61.63.65.5f.64.73") >> client_qos.prefix; + + // Init client 1 + client_1->wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .pub_topic_name(TEST_TOPIC_NAME); + ASSERT_TRUE(client_1->init_participant()); + ASSERT_TRUE(client_1->init_publisher(0u)); + + // Init client 2 + WireProtocolConfigQos client2_qos = client_qos; + std::istringstream("73.61.74.61.5f.72.61.63.65.5f.64.73") >> client2_qos.prefix; + client_2->wire_protocol(client2_qos) + .disable_builtin_transport() + .add_user_transport_to_pparams(test_transport) + .sub_topic_name(TEST_TOPIC_NAME); + ASSERT_TRUE(client_2->init_participant()); + ASSERT_TRUE(client_2->init_subscriber(0u)); + + // Wait at least the servers routine period to discover endpoints + ASSERT_TRUE(server->wait_discovery(std::chrono::seconds(6), 2, true)); + client_2->sub_wait_discovery(1); + client_1->pub_wait_discovery(1); + + // Server discovered both clients, activate filter to block Data(Up) ACKs + std::this_thread::sleep_for(std::chrono::seconds(1)); + std::cout << "Blocking Data(Up) ACKs activated" << std::endl; + filter_activated.store(true, std::memory_order::memory_order_seq_cst); + + // Remove client 1 + client_1.reset(); + ASSERT_TRUE(client_1 == nullptr); + // Ensure client 2 has unmatched the client 1 + client_2->sub_wait_discovery(0); + std::this_thread::sleep_for(std::chrono::seconds(3)); + + // Relaunch client 1 and deactivate the filter + client_1 = std::make_shared>(1u, 0u, 0u, 1u); + client_1->wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .pub_topic_name(TEST_TOPIC_NAME); + // Init only the participant, publisher will be initialized later + ASSERT_TRUE(client_1->init_participant()); + std::cout << "Blocking Data(Up) ACKs deactivated" << std::endl; + filter_activated.store(false, std::memory_order::memory_order_seq_cst); + + // Give time to receive Data(Up) ACK, process the Data(p) and update the participant, + // but do it before a whole period of the server's routine. + // In this way new Data(w) and Data(Up) ACK are processed in the same routine + std::this_thread::sleep_for(std::chrono::seconds(1)); + ASSERT_TRUE(client_1->init_publisher(0u)); + + // Client 2 should discover client 1 again and log error "Writer has no associated participant." should not appear + client_2->sub_wait_discovery(1); +} +>>>>>>> ec666f72 (Solve Discovery Server race conditions (#5780))