Skip to content

Commit 657c521

Browse files
Decouple transport receivers creation using unique network flows (#5583) (#5591)
* Refs #22519. Add regression test Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #22519. Decouple transport receivers creation using unique network flows Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #22519. Add comment for future developers Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #22519. Apply suggestion Signed-off-by: Juan Lopez Fernandez <[email protected]> * Refs #22519. Reuse unique ports for locators of same kind in a reader Signed-off-by: Juan Lopez Fernandez <[email protected]> --------- Signed-off-by: Juan Lopez Fernandez <[email protected]> (cherry picked from commit e6e918f) Co-authored-by: juanlofer-eprosima <[email protected]>
1 parent 8deaa69 commit 657c521

File tree

3 files changed

+148
-17
lines changed

3 files changed

+148
-17
lines changed

src/cpp/rtps/participant/RTPSParticipantImpl.cpp

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include <algorithm>
2121
#include <functional>
22+
#include <map>
2223
#include <memory>
2324
#include <mutex>
2425
#include <sstream>
@@ -1837,42 +1838,69 @@ bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint(
18371838
if (unique_flows)
18381839
{
18391840
attributes.multicastLocatorList.clear();
1840-
attributes.unicastLocatorList = m_att.defaultUnicastLocatorList;
1841+
attributes.unicastLocatorList.clear();
18411842
attributes.external_unicast_locators.clear();
18421843

1843-
uint16_t port = initial_unique_port;
1844-
while (port < final_unique_port)
1844+
// Register created resources to distinguish the case where a receiver was created in this same function call
1845+
// (and can be reused for other locators of the same kind in this reader), and that in which it was already
1846+
// created before for other reader in this same participant.
1847+
std::map<int32_t, int16_t> created_resources;
1848+
1849+
// Create unique flows for unicast locators
1850+
LocatorList_t input_locator_list = m_att.defaultUnicastLocatorList;
1851+
for (Locator_t& loc : input_locator_list)
18451852
{
1846-
// Set port on unicast locators
1847-
for (Locator_t& loc : attributes.unicastLocatorList)
1853+
uint16_t port = created_resources.count(loc.kind) ? created_resources[loc.kind] : initial_unique_port;
1854+
while (port < final_unique_port)
18481855
{
18491856
// Set logical port only TCP locators
18501857
if (LOCATOR_KIND_TCPv4 == loc.kind || LOCATOR_KIND_TCPv6 == loc.kind)
18511858
{
1859+
// Due to current implementation limitations only one physical port (actual socket receiver)
1860+
// is allowed when using TCP tranport. All we can do for now is to create a unique "logical" flow.
1861+
// TODO(juanlofer): create a unique dedicated TCP communication channel once this limitation is removed.
18521862
IPLocator::setLogicalPort(loc, port);
18531863
}
18541864
else
18551865
{
18561866
loc.port = port;
18571867
}
1868+
1869+
// Try creating receiver for this locator
1870+
LocatorList_t aux_locator_list;
1871+
aux_locator_list.push_back(loc);
1872+
if (createReceiverResources(aux_locator_list, false, true, false))
1873+
{
1874+
created_resources[loc.kind] = port;
1875+
}
1876+
1877+
// Locator will be present in the list if receiver was created, or was already created
1878+
// Continue if receiver not created for this reader (might exist but created for other reader in this same participant)
1879+
if (!aux_locator_list.empty() &&
1880+
created_resources.count(loc.kind) && (created_resources[loc.kind] == port))
1881+
{
1882+
break;
1883+
}
1884+
1885+
// Try with next port
1886+
++port;
18581887
}
18591888

1860-
// Try creating receiver resources
1861-
LocatorList_t aux_locator_list = attributes.unicastLocatorList;
1862-
if (createReceiverResources(aux_locator_list, false, true, false))
1889+
// Fail when unique ports are exhausted
1890+
if (port >= final_unique_port)
18631891
{
1864-
break;
1892+
EPROSIMA_LOG_WARNING(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: "
1893+
<< initial_unique_port << "-" << final_unique_port << ". Discarding locator: " << loc);
1894+
}
1895+
else
1896+
{
1897+
attributes.unicastLocatorList.push_back(loc);
18651898
}
1866-
1867-
// Try with next port
1868-
++port;
18691899
}
18701900

1871-
// Fail when unique ports are exhausted
1872-
if (port >= final_unique_port)
1901+
if (attributes.unicastLocatorList.empty())
18731902
{
1874-
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: "
1875-
<< initial_unique_port << "-" << final_unique_port);
1903+
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "No unicast locators to create unique flows");
18761904
return false;
18771905
}
18781906
}
@@ -1983,7 +2011,7 @@ bool RTPSParticipantImpl::createReceiverResources(
19832011
bool ret_val = input_list.empty();
19842012

19852013
#if HAVE_SECURITY
1986-
// An auxilary buffer is needed in the ReceiverResource to to decrypt the message,
2014+
// An auxilary buffer is needed in the ReceiverResource to decrypt the message,
19872015
// that imposes a limit in the received messages size even if the transport allows (uint32_t) messages size.
19882016
uint32_t max_receiver_buffer_size =
19892017
is_secure() ? std::numeric_limits<uint16_t>::max() : (std::numeric_limits<uint32_t>::max)();

src/cpp/rtps/participant/RTPSParticipantImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,6 +1023,7 @@ class RTPSParticipantImpl
10231023
* @param ApplyMutation - True if we want to create a Resource with a "similar" locator if the one we provide is unavailable
10241024
* @param RegisterReceiver - True if we want the receiver to be registered. Useful for receivers created after participant is enabled.
10251025
* @param log_when_creation_fails - True if a log warning shall be issued for each locator when a receiver resource cannot be created.
1026+
* @return True if a receiver resource was created for at least a locator in the list, false otherwise.
10261027
*/
10271028
bool createReceiverResources(
10281029
LocatorList_t& Locator_list,

test/blackbox/common/BlackboxTestsNetworkConf.cpp

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "PubSubParticipant.hpp"
2323

2424
#include <fastrtps/rtps/common/Locator.h>
25+
#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h>
2526
#include <fastrtps/utils/IPFinder.h>
2627

2728
using namespace eprosima::fastrtps;
@@ -165,6 +166,107 @@ TEST_P(NetworkConfig, sub_unique_network_flows)
165166
}
166167
}
167168

169+
// Regression test for redmine issue #22519 to check that readers using unique network flows cannot share locators
170+
// with other readers. The mentioned issue referred to the case where TCP + builtin transports are present.
171+
// In that concrete scenario, the problem was that while the TCP (and UDP) transports rightly were able
172+
// to create a receiver in the dedicated "unique flow" port, shared memory failed for that same port as the other
173+
// process (or participant) is already listening on it. However this was not being handled properly, so once matched,
174+
// the publisher attempts to send data to the wrongfully announced shared memory locator.
175+
// Note that the underlying problem is that, when creating unique network flows, all transports are requested to
176+
// create a receiver for a specific port all together. This is, the creation of unique flow receivers is only
177+
// considered to fail when it fails for all transports, instead of decoupling them and keep trying for alternative
178+
// ports when the creation of a specific transport receiver fails.
179+
// In this test a similar scenario is presented, but using instead UDP and shared memory transports. In the first
180+
// participant, only shared memory is used (which should create a SHM receiver in the first "unique" port attempted).
181+
// In the second participant both UDP and shared memory are used (which should create a UDP receiver in the first
182+
// "unique" port attempted, and a shared memory receiver in the second "unique" port attempted, as the first one is
183+
// already being used by the first participant). As a result, the listening shared memory locators of each data
184+
// reader should be different. Finally, a third data reader is created in the second participant, and it is verified
185+
// that its listening locators are different from those of the other reader created in the same participant, as well as
186+
// from the (SHM) one of the reader created in the first participant.
187+
TEST_P(NetworkConfig, sub_unique_network_flows_multiple_locators)
188+
{
189+
// Enable unique network flows feature
190+
PropertyPolicy properties;
191+
properties.properties().emplace_back("fastdds.unique_network_flows", "");
192+
193+
// First participant
194+
PubSubParticipant<HelloWorldPubSubType> participant(0, 1, 0, 0);
195+
196+
participant.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties);
197+
198+
std::shared_ptr<eprosima::fastdds::rtps::SharedMemTransportDescriptor> shm_descriptor =
199+
std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>();
200+
// Use only SHM transport in the first participant
201+
participant.disable_builtin_transport().add_user_transport_to_pparams(shm_descriptor);
202+
203+
ASSERT_TRUE(participant.init_participant());
204+
ASSERT_TRUE(participant.init_subscriber(0));
205+
206+
LocatorList_t locators;
207+
208+
participant.get_native_reader(0).get_listening_locators(locators);
209+
ASSERT_EQ(locators.size(), 1u);
210+
ASSERT_EQ((*locators.begin()).kind, LOCATOR_KIND_SHM);
211+
212+
// Second participant
213+
PubSubParticipant<HelloWorldPubSubType> participant2(0, 2, 0, 0);
214+
215+
participant2.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties);
216+
217+
// Use both UDP and SHM in the second participant
218+
if (!use_udpv4)
219+
{
220+
participant2.disable_builtin_transport().add_user_transport_to_pparams(descriptor_).
221+
add_user_transport_to_pparams(shm_descriptor);
222+
}
223+
224+
ASSERT_TRUE(participant2.init_participant());
225+
ASSERT_TRUE(participant2.init_subscriber(0));
226+
227+
LocatorList_t locators2_1;
228+
229+
participant2.get_native_reader(0).get_listening_locators(locators2_1);
230+
ASSERT_TRUE(locators2_1.size() >= 2u); // There should be at least two locators, one for SHM and N(#interfaces) for UDP
231+
232+
// Check SHM locator is different from the one in the first participant
233+
for (const Locator_t& loc : locators2_1)
234+
{
235+
if (LOCATOR_KIND_SHM == loc.kind)
236+
{
237+
// Ports should be different (expected second and first values of the unique network flows port range)
238+
ASSERT_FALSE(loc == *locators.begin());
239+
}
240+
}
241+
242+
// Now create a second reader in the second participant
243+
ASSERT_TRUE(participant2.init_subscriber(1));
244+
245+
LocatorList_t locators2_2;
246+
247+
participant2.get_native_reader(1).get_listening_locators(locators2_2);
248+
ASSERT_TRUE(locators2_2.size() >= 2u); // There should be at least two locators, one for SHM and N(#interfaces) for UDP
249+
250+
// Check SHM locator is different from the one in the first participant
251+
for (const Locator_t& loc : locators2_2)
252+
{
253+
if (LOCATOR_KIND_SHM == loc.kind)
254+
{
255+
// Ports should be different (expected third and first values of the unique network flows port range)
256+
ASSERT_FALSE(loc == *locators.begin());
257+
}
258+
}
259+
260+
// Now check no locators are shared between the two readers in the second participant
261+
for (const Locator_t& loc_1 : locators2_1)
262+
{
263+
for (const Locator_t& loc_2 : locators2_2)
264+
{
265+
ASSERT_FALSE(loc_1 == loc_2);
266+
}
267+
}
268+
}
269+
168270
//Verify that outLocatorList is used to select the desired output channel
169271
TEST_P(NetworkConfig, PubSubOutLocatorSelection)
170272
{

0 commit comments

Comments
 (0)