Skip to content

Commit 2c7e56f

Browse files
Fix assertion on OutputTrafficManager (#5704) (#5711)
* Fix assertion on `OutputTrafficManager` (#5704) * Refs #22930. Add entry when missing on `set_statistics_message_data`. Signed-off-by: Miguel Company <[email protected]> * Refs #22930. Create sender resources on stateless readers. Signed-off-by: Miguel Company <[email protected]> * Refs #22930. Similar changes on writers. Signed-off-by: Miguel Company <[email protected]> * Refs #22930: Test TCP with best effort reader Signed-off-by: cferreiragonz <[email protected]> * Refs #22930: Uncrustify Signed-off-by: cferreiragonz <[email protected]> --------- Signed-off-by: Miguel Company <[email protected]> Signed-off-by: cferreiragonz <[email protected]> Co-authored-by: cferreiragonz <[email protected]> (cherry picked from commit 777a71b) # Conflicts: # src/cpp/rtps/reader/StatelessReader.cpp # src/cpp/rtps/writer/StatelessWriter.cpp * Fix conflicts --------- Co-authored-by: Miguel Company <[email protected]>
1 parent 7e8717e commit 2c7e56f

File tree

7 files changed

+102
-12
lines changed

7 files changed

+102
-12
lines changed

src/cpp/rtps/participant/RTPSParticipantImpl.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1991,6 +1991,28 @@ void RTPSParticipantImpl::createSenderResources(
19911991
m_network_Factory.build_send_resources(send_resource_list_, locator_selector_entry);
19921992
}
19931993

1994+
void RTPSParticipantImpl::createSenderResources(
1995+
const RemoteLocatorList& locator_list,
1996+
const EndpointAttributes& param)
1997+
{
1998+
using network::external_locators::filter_remote_locators;
1999+
2000+
LocatorSelectorEntry entry(locator_list.unicast.size(), locator_list.multicast.size());
2001+
entry.multicast = locator_list.multicast;
2002+
entry.unicast = locator_list.unicast;
2003+
filter_remote_locators(entry, param.external_unicast_locators, param.ignore_non_matching_locators);
2004+
2005+
std::lock_guard<std::timed_mutex> lock(m_send_resources_mutex_);
2006+
for (const Locator_t& locator : entry.unicast)
2007+
{
2008+
m_network_Factory.build_send_resources(send_resource_list_, locator);
2009+
}
2010+
for (const Locator_t& locator : entry.multicast)
2011+
{
2012+
m_network_Factory.build_send_resources(send_resource_list_, locator);
2013+
}
2014+
}
2015+
19942016
bool RTPSParticipantImpl::deleteUserEndpoint(
19952017
const GUID_t& endpoint)
19962018
{

src/cpp/rtps/participant/RTPSParticipantImpl.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ namespace rtps {
115115

116116
struct PublicationBuiltinTopicData;
117117
struct TopicDescription;
118+
struct RemoteLocatorList;
118119
class RTPSParticipant;
119120
class RTPSParticipantListener;
120121
class BuiltinProtocols;
@@ -1045,6 +1046,10 @@ class RTPSParticipantImpl
10451046
void createSenderResources(
10461047
const Locator_t& locator);
10471048

1049+
void createSenderResources(
1050+
const RemoteLocatorList& locator_list,
1051+
const EndpointAttributes& param);
1052+
10481053
/**
10491054
* Creates sender resources for the given locator selector entry by calling the NetworkFactory's
10501055
* build_send_resources method.

src/cpp/rtps/reader/StatelessReader.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ bool StatelessReader::matched_writer_add_edp(
107107
std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);
108108
listener = listener_;
109109

110+
bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid());
111+
bool is_datasharing = is_datasharing_compatible_with(wdata);
112+
110113
for (RemoteWriterInfo_t& writer : matched_writers_)
111114
{
112115
if (writer.guid == wdata.guid())
@@ -121,6 +124,11 @@ bool StatelessReader::matched_writer_add_edp(
121124
}
122125
writer.ownership_strength = wdata.m_qos.m_ownershipStrength.value;
123126

127+
if (!is_same_process && !is_datasharing)
128+
{
129+
mp_RTPSParticipant->createSenderResources(wdata.remote_locators(), m_att);
130+
}
131+
124132
if (nullptr != listener)
125133
{
126134
// call the listener without the lock taken
@@ -144,9 +152,6 @@ bool StatelessReader::matched_writer_add_edp(
144152
}
145153
}
146154

147-
bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid());
148-
bool is_datasharing = is_datasharing_compatible_with(wdata);
149-
150155
RemoteWriterInfo_t info;
151156
info.guid = wdata.guid();
152157
info.persistence_guid = wdata.persistence_guid();
@@ -195,6 +200,11 @@ bool StatelessReader::matched_writer_add_edp(
195200
// this has to be done after the writer is added to the matched_writers or the processing may fail
196201
datasharing_listener_->notify(false);
197202
}
203+
204+
if (!is_same_process && !is_datasharing)
205+
{
206+
mp_RTPSParticipant->createSenderResources(wdata.remote_locators(), m_att);
207+
}
198208
}
199209

200210
if (liveliness_lease_duration_ < dds::c_TimeInfinite)

src/cpp/rtps/writer/StatefulWriter.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,7 @@ bool StatefulWriter::matched_reader_add_edp(
983983
m_att.external_unicast_locators, m_att.ignore_non_matching_locators);
984984
filter_remote_locators(*reader->async_locator_selector_entry(),
985985
m_att.external_unicast_locators, m_att.ignore_non_matching_locators);
986+
mp_RTPSParticipant->createSenderResources(rdata.remote_locators(), m_att);
986987
update_reader_info(locator_selector_general_, true);
987988
update_reader_info(locator_selector_async_, true);
988989
}
@@ -1069,6 +1070,7 @@ bool StatefulWriter::matched_reader_add_edp(
10691070
}
10701071
}
10711072

1073+
mp_RTPSParticipant->createSenderResources(rdata.remote_locators(), m_att);
10721074
update_reader_info(locator_selector_general_, true);
10731075
update_reader_info(locator_selector_async_, true);
10741076

src/cpp/rtps/writer/StatelessWriter.cpp

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ bool StatelessWriter::matched_reader_add_edp(
455455
{
456456
filter_remote_locators(*reader.general_locator_selector_entry(),
457457
m_att.external_unicast_locators, m_att.ignore_non_matching_locators);
458+
mp_RTPSParticipant->createSenderResources(data.remote_locators(), m_att);
458459
update_reader_info(true);
459460
}
460461
return true;
@@ -543,14 +544,7 @@ bool StatelessWriter::matched_reader_add_edp(
543544
}
544545

545546
// Create sender resources for the case when we send to a single reader
546-
locator_selector_.locator_selector.reset(false);
547-
locator_selector_.locator_selector.enable(data.guid());
548-
mp_RTPSParticipant->network_factory().select_locators(locator_selector_.locator_selector);
549-
RTPSParticipantImpl* part = mp_RTPSParticipant;
550-
locator_selector_.locator_selector.for_each([part](const Locator_t& loc)
551-
{
552-
part->createSenderResources(loc);
553-
});
547+
mp_RTPSParticipant->createSenderResources(data.remote_locators(), m_att);
554548

555549
// Create sender resources for the case when we send to all readers
556550
update_reader_info(true);

src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <utility>
2626

2727
#include <fastdds/config.hpp>
28+
#include <fastdds/dds/log/Log.hpp>
2829
#include <fastdds/rtps/common/Locator.hpp>
2930

3031
#include <statistics/rtps/messages/RTPSStatisticsMessages.hpp>
@@ -83,7 +84,12 @@ class OutputTrafficManager
8384
return locator == entry.first;
8485
};
8586
auto it = std::find_if(collection_.begin(), collection_.end(), search);
86-
assert(it != collection_.end());
87+
if (it == collection_.end())
88+
{
89+
EPROSIMA_LOG_ERROR(RTPS_OUT,
90+
"Locator '" << locator << "' not found in collection. Adding entry.");
91+
it = collection_.insert(it, entry_type(locator, value_type{}));
92+
}
8793
set_statistics_submessage_from_transport(locator, send_buffer, total_bytes, it->second);
8894
#endif // FASTDDS_STATISTICS
8995
}

test/blackbox/common/BlackboxTestsTransportTCP.cpp

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1506,6 +1506,57 @@ TEST_P(TransportTCP, tcp_unique_network_flows_communication)
15061506
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(30)));
15071507
}
15081508

1509+
/**
1510+
* This verifies that a best effort reader is capable of creating resources when a new locator
1511+
* is received along a Data(W) in order to start communication. This will ensure the creation a new connect channel.
1512+
* The reader must have the lowest listening port to force the participant to create the channel.
1513+
*/
1514+
TEST_P(TransportTCP, best_effort_reader_tcp_resources_creation)
1515+
{
1516+
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
1517+
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);
1518+
1519+
// Large data setup is reused to enable UDP for multicast and TCP for data.
1520+
// However, the metatraffic unicast needs to be replaced for UDP to ensure that the TCP
1521+
// locator is not announced in the Data(P) (In large data the metatraffic unicast is TCP).
1522+
LocatorList metatraffic_unicast;
1523+
eprosima::fastdds::rtps::Locator_t udp_locator;
1524+
udp_locator.kind = LOCATOR_KIND_UDPv4;
1525+
eprosima::fastdds::rtps::IPLocator::setIPv4(udp_locator, "127.0.0.1");
1526+
metatraffic_unicast.push_back(udp_locator);
1527+
1528+
// Writer with highest listening port will wait for connection
1529+
writer.setup_large_data_tcp(use_ipv6, global_port + 1)
1530+
.metatraffic_unicast_locator_list(metatraffic_unicast)
1531+
.init();
1532+
1533+
// Reader with lowest listening port to force the connection channel creation
1534+
reader.setup_large_data_tcp(use_ipv6, global_port)
1535+
.reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS)
1536+
.metatraffic_unicast_locator_list(metatraffic_unicast)
1537+
.init();
1538+
1539+
ASSERT_TRUE(writer.isInitialized());
1540+
ASSERT_TRUE(reader.isInitialized());
1541+
1542+
writer.wait_discovery(std::chrono::seconds(5));
1543+
reader.wait_discovery(std::chrono::seconds(5));
1544+
1545+
ASSERT_EQ(writer.get_matched(), 1u);
1546+
ASSERT_EQ(reader.get_matched(), 1u);
1547+
1548+
// Although participants have matched, the TCP connection might not be established yet.
1549+
// This active wait ensures the connection had time to be established before sending non-reliable samples.
1550+
std::this_thread::sleep_for(std::chrono::seconds(3));
1551+
1552+
auto data = default_helloworld_data_generator();
1553+
reader.startReception(data);
1554+
writer.send(data);
1555+
ASSERT_TRUE(data.empty());
1556+
1557+
reader.block_for_all();
1558+
}
1559+
15091560
#ifdef INSTANTIATE_TEST_SUITE_P
15101561
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
15111562
#else

0 commit comments

Comments
 (0)