Skip to content

[23001] 2.14.x - Avoid sending statistics msg with big msgs and no fragmentation #5743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: 2.14.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/cpp/rtps/messages/RTPSMessageGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,12 @@ void RTPSMessageGroup::send()
}
#endif // if HAVE_SECURITY

eprosima::fastdds::statistics::rtps::add_statistics_submessage(msgToSend);
if (msgToSend->length <
static_cast<uint32_t>(std::numeric_limits<uint16_t>::max() - RTPSMESSAGE_DATA_MIN_LENGTH))
{
// Avoid sending the data message for DATA that are not fragmented and exceed the 65 kB limit
eprosima::fastdds::statistics::rtps::add_statistics_submessage(msgToSend);
}

if (!sender_->send(msgToSend,
max_blocking_time_point_))
Expand Down
6 changes: 6 additions & 0 deletions test/blackbox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ set(RTPS_BLACKBOXTESTS_SOURCE ${RTPS_BLACKBOXTESTS_TEST_SOURCE}
types/Data1mb.cxx
types/Data1mbPubSubTypes.cxx
types/Data1mbv1.cxx
types/Data100kb.cxx
types/Data100kbPubSubTypes.cxx
types/Data100kbv1.cxx
types/Data64kb.cxx
types/Data64kbPubSubTypes.cxx
types/Data64kbv1.cxx
Expand Down Expand Up @@ -116,6 +119,9 @@ set(BLACKBOXTESTS_SOURCE ${BLACKBOXTESTS_TEST_SOURCE}
types/Data1mb.cxx
types/Data1mbPubSubTypes.cxx
types/Data1mbv1.cxx
types/Data100kb.cxx
types/Data100kbPubSubTypes.cxx
types/Data100kbv1.cxx
types/Data64kb.cxx
types/Data64kbPubSubTypes.cxx
types/Data64kbv1.cxx
Expand Down
21 changes: 15 additions & 6 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ using eprosima::fastrtps::rtps::IPLocator;
using eprosima::fastdds::rtps::UDPTransportDescriptor;
using eprosima::fastdds::rtps::UDPv4TransportDescriptor;
using eprosima::fastdds::rtps::UDPv6TransportDescriptor;
using eprosima::fastdds::rtps::BuiltinTransports;
using eprosima::fastdds::rtps::BuiltinTransportsOptions;

using SampleLostStatusFunctor = std::function<void (const eprosima::fastdds::dds::SampleLostStatus&)>;
using SampleRejectedStatusFunctor = std::function<void (const eprosima::fastdds::dds::SampleRejectedStatus&)>;
Expand Down Expand Up @@ -1029,15 +1031,15 @@ class PubSubReader
}

PubSubReader& setup_transports(
eprosima::fastdds::rtps::BuiltinTransports transports)
BuiltinTransports transports)
{
participant_qos_.setup_transports(transports);
return *this;
}

PubSubReader& setup_transports(
eprosima::fastdds::rtps::BuiltinTransports transports,
const eprosima::fastdds::rtps::BuiltinTransportsOptions& options)
BuiltinTransports transports,
const BuiltinTransportsOptions& options)
{
participant_qos_.setup_transports(transports, options);
return *this;
Expand All @@ -1046,9 +1048,10 @@ class PubSubReader
PubSubReader& setup_large_data_tcp(
bool v6 = false,
const uint16_t& port = 0,
const uint32_t& tcp_negotiation_timeout = 0)
const BuiltinTransportsOptions& options = BuiltinTransportsOptions())
{
participant_qos_.transport().use_builtin_transports = false;
participant_qos_.transport().max_msg_size_no_frag = options.maxMessageSize;

/* Transports configuration */
// UDP transport for PDP over multicast
Expand All @@ -1066,7 +1069,10 @@ class PubSubReader
data_transport->check_crc = false;
data_transport->apply_security = false;
data_transport->enable_tcp_nodelay = true;
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
data_transport->maxMessageSize = options.maxMessageSize;
data_transport->sendBufferSize = options.sockets_buffer_size;
data_transport->receiveBufferSize = options.sockets_buffer_size;
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
participant_qos_.transport().user_transports.push_back(data_transport);
}
else
Expand All @@ -1080,7 +1086,10 @@ class PubSubReader
data_transport->check_crc = false;
data_transport->apply_security = false;
data_transport->enable_tcp_nodelay = true;
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
data_transport->maxMessageSize = options.maxMessageSize;
data_transport->sendBufferSize = options.sockets_buffer_size;
data_transport->receiveBufferSize = options.sockets_buffer_size;
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
participant_qos_.transport().user_transports.push_back(data_transport);
}

Expand Down
27 changes: 21 additions & 6 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ using eprosima::fastrtps::rtps::IPLocator;
using eprosima::fastdds::rtps::UDPTransportDescriptor;
using eprosima::fastdds::rtps::UDPv4TransportDescriptor;
using eprosima::fastdds::rtps::UDPv6TransportDescriptor;
using eprosima::fastdds::rtps::BuiltinTransports;
using eprosima::fastdds::rtps::BuiltinTransportsOptions;

template<class TypeSupport, typename TypeTraits = PubSubTypeTraits<TypeSupport>>
class PubSubWriter
Expand Down Expand Up @@ -986,15 +988,15 @@ class PubSubWriter
}

PubSubWriter& setup_transports(
eprosima::fastdds::rtps::BuiltinTransports transports)
BuiltinTransports transports)
{
participant_qos_.setup_transports(transports);
return *this;
}

PubSubWriter& setup_transports(
eprosima::fastdds::rtps::BuiltinTransports transports,
const eprosima::fastdds::rtps::BuiltinTransportsOptions& options)
BuiltinTransports transports,
const BuiltinTransportsOptions& options)
{
participant_qos_.setup_transports(transports, options);
return *this;
Expand All @@ -1003,9 +1005,10 @@ class PubSubWriter
PubSubWriter& setup_large_data_tcp(
bool v6 = false,
const uint16_t& port = 0,
const uint32_t& tcp_negotiation_timeout = 0)
const BuiltinTransportsOptions& options = BuiltinTransportsOptions())
{
participant_qos_.transport().use_builtin_transports = false;
participant_qos_.transport().max_msg_size_no_frag = options.maxMessageSize;

/* Transports configuration */
// UDP transport for PDP over multicast
Expand All @@ -1015,6 +1018,9 @@ class PubSubWriter
if (v6)
{
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv6TransportDescriptor>();
pdp_transport->maxMessageSize = options.maxMessageSize;
pdp_transport->sendBufferSize = options.sockets_buffer_size;
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
participant_qos_.transport().user_transports.push_back(pdp_transport);

auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv6TransportDescriptor>();
Expand All @@ -1023,12 +1029,18 @@ class PubSubWriter
data_transport->check_crc = false;
data_transport->apply_security = false;
data_transport->enable_tcp_nodelay = true;
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
data_transport->maxMessageSize = options.maxMessageSize;
data_transport->sendBufferSize = options.sockets_buffer_size;
data_transport->receiveBufferSize = options.sockets_buffer_size;
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
participant_qos_.transport().user_transports.push_back(data_transport);
}
else
{
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
pdp_transport->maxMessageSize = options.maxMessageSize;
pdp_transport->sendBufferSize = options.sockets_buffer_size;
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
participant_qos_.transport().user_transports.push_back(pdp_transport);

auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
Expand All @@ -1037,7 +1049,10 @@ class PubSubWriter
data_transport->check_crc = false;
data_transport->apply_security = false;
data_transport->enable_tcp_nodelay = true;
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
data_transport->maxMessageSize = options.maxMessageSize;
data_transport->sendBufferSize = options.sockets_buffer_size;
data_transport->receiveBufferSize = options.sockets_buffer_size;
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
participant_qos_.transport().user_transports.push_back(data_transport);
}

Expand Down
27 changes: 21 additions & 6 deletions test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ using eprosima::fastrtps::rtps::IPLocator;
using eprosima::fastrtps::rtps::UDPTransportDescriptor;
using eprosima::fastrtps::rtps::UDPv4TransportDescriptor;
using eprosima::fastrtps::rtps::UDPv6TransportDescriptor;
using eprosima::fastdds::rtps::BuiltinTransports;
using eprosima::fastdds::rtps::BuiltinTransportsOptions;

template<class TypeSupport>
class PubSubReader
Expand Down Expand Up @@ -749,15 +751,15 @@ class PubSubReader
}

PubSubReader& setup_transports(
eprosima::fastdds::rtps::BuiltinTransports transports)
BuiltinTransports transports)
{
participant_attr_.rtps.setup_transports(transports);
return *this;
}

PubSubReader& setup_transports(
eprosima::fastdds::rtps::BuiltinTransports transports,
const eprosima::fastdds::rtps::BuiltinTransportsOptions& options)
BuiltinTransports transports,
const BuiltinTransportsOptions& options)
{
participant_attr_.rtps.setup_transports(transports, options);
return *this;
Expand All @@ -766,9 +768,10 @@ class PubSubReader
PubSubReader& setup_large_data_tcp(
bool v6 = false,
const uint16_t& port = 0,
const uint32_t& tcp_negotiation_timeout = 0)
const BuiltinTransportsOptions& options = BuiltinTransportsOptions())
{
participant_attr_.rtps.useBuiltinTransports = false;
participant_attr_.rtps.max_msg_size_no_frag = options.maxMessageSize;

/* Transports configuration */
// UDP transport for PDP over multicast
Expand All @@ -778,6 +781,9 @@ class PubSubReader
if (v6)
{
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv6TransportDescriptor>();
pdp_transport->maxMessageSize = options.maxMessageSize;
pdp_transport->sendBufferSize = options.sockets_buffer_size;
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
participant_attr_.rtps.userTransports.push_back(pdp_transport);

auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv6TransportDescriptor>();
Expand All @@ -786,12 +792,18 @@ class PubSubReader
data_transport->check_crc = false;
data_transport->apply_security = false;
data_transport->enable_tcp_nodelay = true;
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
data_transport->maxMessageSize = options.maxMessageSize;
data_transport->sendBufferSize = options.sockets_buffer_size;
data_transport->receiveBufferSize = options.sockets_buffer_size;
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
participant_attr_.rtps.userTransports.push_back(data_transport);
}
else
{
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
pdp_transport->maxMessageSize = options.maxMessageSize;
pdp_transport->sendBufferSize = options.sockets_buffer_size;
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
participant_attr_.rtps.userTransports.push_back(pdp_transport);

auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
Expand All @@ -800,7 +812,10 @@ class PubSubReader
data_transport->check_crc = false;
data_transport->apply_security = false;
data_transport->enable_tcp_nodelay = true;
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
data_transport->maxMessageSize = options.maxMessageSize;
data_transport->sendBufferSize = options.sockets_buffer_size;
data_transport->receiveBufferSize = options.sockets_buffer_size;
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
participant_attr_.rtps.userTransports.push_back(data_transport);
}

Expand Down
27 changes: 21 additions & 6 deletions test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ using eprosima::fastrtps::rtps::IPLocator;
using eprosima::fastrtps::rtps::UDPTransportDescriptor;
using eprosima::fastrtps::rtps::UDPv4TransportDescriptor;
using eprosima::fastrtps::rtps::UDPv6TransportDescriptor;
using eprosima::fastdds::rtps::BuiltinTransports;
using eprosima::fastdds::rtps::BuiltinTransportsOptions;

template<class TypeSupport>
class PubSubWriter
Expand Down Expand Up @@ -760,15 +762,15 @@ class PubSubWriter
}

PubSubWriter& setup_transports(
eprosima::fastdds::rtps::BuiltinTransports transports)
BuiltinTransports transports)
{
participant_attr_.rtps.setup_transports(transports);
return *this;
}

PubSubWriter& setup_transports(
eprosima::fastdds::rtps::BuiltinTransports transports,
const eprosima::fastdds::rtps::BuiltinTransportsOptions& options)
BuiltinTransports transports,
const BuiltinTransportsOptions& options)
{
participant_attr_.rtps.setup_transports(transports, options);
return *this;
Expand All @@ -777,9 +779,10 @@ class PubSubWriter
PubSubWriter& setup_large_data_tcp(
bool v6 = false,
const uint16_t& port = 0,
const uint32_t& tcp_negotiation_timeout = 0)
const BuiltinTransportsOptions& options = BuiltinTransportsOptions())
{
participant_attr_.rtps.useBuiltinTransports = false;
participant_attr_.rtps.max_msg_size_no_frag = options.maxMessageSize;

/* Transports configuration */
// UDP transport for PDP over multicast
Expand All @@ -789,6 +792,9 @@ class PubSubWriter
if (v6)
{
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv6TransportDescriptor>();
pdp_transport->maxMessageSize = options.maxMessageSize;
pdp_transport->sendBufferSize = options.sockets_buffer_size;
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
participant_attr_.rtps.userTransports.push_back(pdp_transport);

auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv6TransportDescriptor>();
Expand All @@ -797,12 +803,18 @@ class PubSubWriter
data_transport->check_crc = false;
data_transport->apply_security = false;
data_transport->enable_tcp_nodelay = true;
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
data_transport->maxMessageSize = options.maxMessageSize;
data_transport->sendBufferSize = options.sockets_buffer_size;
data_transport->receiveBufferSize = options.sockets_buffer_size;
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
participant_attr_.rtps.userTransports.push_back(data_transport);
}
else
{
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
pdp_transport->maxMessageSize = options.maxMessageSize;
pdp_transport->sendBufferSize = options.sockets_buffer_size;
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
participant_attr_.rtps.userTransports.push_back(pdp_transport);

auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
Expand All @@ -811,7 +823,10 @@ class PubSubWriter
data_transport->check_crc = false;
data_transport->apply_security = false;
data_transport->enable_tcp_nodelay = true;
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
data_transport->maxMessageSize = options.maxMessageSize;
data_transport->sendBufferSize = options.sockets_buffer_size;
data_transport->receiveBufferSize = options.sockets_buffer_size;
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
participant_attr_.rtps.userTransports.push_back(data_transport);
}

Expand Down
14 changes: 14 additions & 0 deletions test/blackbox/common/BlackboxTests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include "../types/Data1mbPubSubTypes.h"
#include "../types/Data64kbPubSubTypes.h"
#include "../types/Data100kbPubSubTypes.h"
#include "../types/FixedSizedPubSubTypes.h"
#include "../types/HelloWorldPubSubTypes.h"
#include "../types/KeyedData1mbPubSubTypes.h"
Expand Down Expand Up @@ -88,6 +89,10 @@ template<>
void default_receive_print(
const Data64kb& data);

template<>
void default_receive_print(
const Data100kb& data);

template<>
void default_receive_print(
const Data1mb& data);
Expand Down Expand Up @@ -127,6 +132,10 @@ template<>
void default_send_print(
const Data64kb& data);

template<>
void default_send_print(
const Data100kb& data);

template<>
void default_send_print(
const Data1mb& data);
Expand Down Expand Up @@ -168,6 +177,9 @@ std::list<Data1mb> default_data300kb_mix_data_generator(
std::list<Data1mb> default_data96kb_data300kb_data_generator(
size_t max = 0);

std::list<Data100kb> default_data100kb_data_generator(
size_t max = 0);

std::list<KeyedData1mb> default_keyeddata300kb_data_generator(
size_t max = 0);

Expand All @@ -185,6 +197,8 @@ extern const std::function<void(const StringTest&)> default_string_print;

extern const std::function<void(const Data64kb&)> default_data64kb_print;

extern const std::function<void(const Data100kb&)> default_data100kb_print;

extern const std::function<void(const Data1mb&)> default_data300kb_print;

template<typename T>
Expand Down
Loading
Loading