Skip to content

[23153] Improvements on RPC entities #5801

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/fastdds/dds/domain/qos/ReplierQos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ class ReplierQos
// Set reliability to RELIABLE_RELIABILITY_QOS by default
writer_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;

// Set durability to VOLATILE_DURABILITY_QOS by default
writer_qos.durability().kind = VOLATILE_DURABILITY_QOS;
reader_qos.durability().kind = VOLATILE_DURABILITY_QOS;

// Set history to KEEP_ALL_HISTORY_QOS by default
writer_qos.history().kind = KEEP_ALL_HISTORY_QOS;
reader_qos.history().kind = KEEP_ALL_HISTORY_QOS;
}

/**
Expand Down
8 changes: 8 additions & 0 deletions include/fastdds/dds/domain/qos/RequesterQos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ class RequesterQos
// Set reliability to RELIABLE_RELIABILITY_QOS by default
writer_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;

// Set durability to VOLATILE_DURABILITY_QOS by default
writer_qos.durability().kind = VOLATILE_DURABILITY_QOS;
reader_qos.durability().kind = VOLATILE_DURABILITY_QOS;

// Set history to KEEP_ALL_HISTORY_QOS by default
writer_qos.history().kind = KEEP_ALL_HISTORY_QOS;
reader_qos.history().kind = KEEP_ALL_HISTORY_QOS;
}

/**
Expand Down
7 changes: 6 additions & 1 deletion include/fastdds/dds/rpc/Replier.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ class Replier : public RPCEntity
*
* @param data Data to send
* @param info Information about the reply sample. This information is used to match the reply with the request through the SampleIdentity
* @return RETCODE_OK if the reply was sent successfully or a ReturnCode related to the specific error otherwise
*
* @return RETCODE_OK if the reply was sent successfully
* @return RETCODE_PRECONDITION_NOT_MET if the replier is not enabled
* @return RETCODE_NO_DATA if the requester that sent the request has disconnected (this usually means that the reply can be dropped)
* @return RETCODE_TIMEOUT if waiting for the requester to be fully matched timed out
* @return a ReturnCode from the underlying DataWriter
*/
virtual ReturnCode_t send_reply(
void* data,
Expand Down
5 changes: 4 additions & 1 deletion include/fastdds/dds/rpc/Requester.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ class Requester : public RPCEntity
*
* @param data Data to send
* @param info Information about the request sample. This information is used to match the request with the reply through the SampleIdentity
* @return RETCODE_OK if the reply was sent successfully or a ReturnCode related to the specific error otherwise
*
* @return RETCODE_OK if the reply was sent successfully
* @return RETCODE_PRECONDITION_NOT_MET if the requester is not enabled or it is not fully matched
* @return a ReturnCode from the underlying DataWriter
*/
virtual ReturnCode_t send_request(
void* data,
Expand Down
71 changes: 68 additions & 3 deletions src/cpp/fastdds/rpc/ReplierImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@

#include <string>

#include <fastdds/dds/builtin/topic/PublicationBuiltinTopicData.hpp>
#include <fastdds/dds/builtin/topic/SubscriptionBuiltinTopicData.hpp>
#include <fastdds/dds/core/detail/DDSReturnCode.hpp>
#include <fastdds/dds/core/LoanableCollection.hpp>
#include <fastdds/dds/core/LoanableSequence.hpp>
#include <fastdds/dds/core/status/PublicationMatchedStatus.hpp>
#include <fastdds/dds/core/status/StatusMask.hpp>
#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp>
#include <fastdds/dds/domain/qos/ReplierQos.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/dds/rpc/RequestInfo.hpp>
Expand Down Expand Up @@ -92,14 +96,27 @@ ReturnCode_t ReplierImpl::send_reply(
void* data,
const RequestInfo& info)
{
FASTDDS_TODO_BEFORE(3, 3, "Implement matching algorithm");

if (!enabled_)
{
EPROSIMA_LOG_ERROR(REPLIER, "Trying to send a reply with a disabled replier");
return RETCODE_PRECONDITION_NOT_MET;
}

FASTDDS_TODO_BEFORE(3, 3, "Wait for the requester to be fully matched");
auto match_status = requester_match_status(info);
if (RequesterMatchStatus::UNMATCHED == match_status)
{
// The writer that sent the request has been unmatched.
EPROSIMA_LOG_WARNING(REPLIER, "Trying to send a reply to a disconnected requester");
return RETCODE_NO_DATA;
}
else if (RequesterMatchStatus::PARTIALLY_MATCHED == match_status)
{
// The writer that sent the request is still matched, but the reply topic is not.
EPROSIMA_LOG_WARNING(REPLIER, "Trying to send a reply to a partially matched requester");
return RETCODE_TIMEOUT;
}

rtps::WriteParams wparams;
wparams.related_sample_identity(info.related_sample_identity);

Expand Down Expand Up @@ -275,7 +292,55 @@ ReturnCode_t ReplierImpl::delete_contained_entities()
return RETCODE_OK;
}

ReplierImpl::RequesterMatchStatus ReplierImpl::requester_match_status(
const RequestInfo& info) const
{
// Check if the replier is still matched with the requester in the request topic
PublicationBuiltinTopicData pub_data;
if (RETCODE_OK != replier_reader_->get_matched_publication_data(pub_data, info.sample_identity.writer_guid()))
{
return RequesterMatchStatus::UNMATCHED;
}

FASTDDS_TODO_BEFORE(3, 3, "Get reply reader GUID from pub_data");

auto related_guid = info.related_sample_identity.writer_guid();
bool reply_topic_matched = false;
if (info.sample_identity.writer_guid() != related_guid)
{
// Custom related GUID (i.e. reply reader GUID) sent with the request.
// Check if the replier writer is matched with that specific reader.
SubscriptionBuiltinTopicData sub_data;
reply_topic_matched = RETCODE_OK == replier_writer_->get_matched_subscription_data(sub_data, related_guid);
}
else
{
// No custom related GUID, check if the replier is matched with all the requesters
// (same number of matched readers and writers)
reply_topic_matched = is_fully_matched();
}

return reply_topic_matched ?
RequesterMatchStatus::MATCHED :
RequesterMatchStatus::PARTIALLY_MATCHED;
}

bool ReplierImpl::is_fully_matched() const
{
PublicationMatchedStatus pub_status;
SubscriptionMatchedStatus sub_status;

if ((RETCODE_OK == replier_reader_->get_subscription_matched_status(sub_status)) &&
(RETCODE_OK == replier_writer_->get_publication_matched_status(pub_status)))
{
return (pub_status.current_count > 0) && (pub_status.current_count == sub_status.current_count);
}

EPROSIMA_LOG_ERROR(REPLIER, "Error getting matched subscriptions or publications");
return false;
}

} // namespace rpc
} // namespace dds
} // namespace fastdds
} // namespace eprosima
} // namespace eprosima
22 changes: 22 additions & 0 deletions src/cpp/fastdds/rpc/ReplierImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,28 @@ class ReplierImpl : public Replier
*/
ReturnCode_t delete_contained_entities();

/**
* @brief Possible states of the replier with respect to a requester
*/
enum class RequesterMatchStatus
{
UNMATCHED, // Request topic not matched
PARTIALLY_MATCHED, // Request topic matched but Reply topic not matched
MATCHED // Both topics matched
};

/**
* @brief Check the matched status of the replier with respect to a requester
*
* @param info Information about the request for which to check the status
*
* @return The matched status of the replier with respect to the requester that sent the request
*/
RequesterMatchStatus requester_match_status(
const RequestInfo& info) const;

bool is_fully_matched() const;

DataReader* replier_reader_;
DataWriter* replier_writer_;
ReplierQos qos_;
Expand Down
24 changes: 24 additions & 0 deletions src/cpp/fastdds/rpc/RequesterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
#include <fastdds/dds/core/detail/DDSReturnCode.hpp>
#include <fastdds/dds/core/LoanableCollection.hpp>
#include <fastdds/dds/core/LoanableSequence.hpp>
#include <fastdds/dds/core/status/PublicationMatchedStatus.hpp>
#include <fastdds/dds/core/status/StatusMask.hpp>
#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp>
#include <fastdds/dds/domain/qos/RequesterQos.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/dds/rpc/RequestInfo.hpp>
Expand Down Expand Up @@ -68,6 +70,12 @@ ReturnCode_t RequesterImpl::send_request(
return RETCODE_PRECONDITION_NOT_MET;
}

if (!is_fully_matched())
{
EPROSIMA_LOG_WARNING(REQUESTER, "Trying to send a request with an unmatched requester");
return RETCODE_PRECONDITION_NOT_MET;
}

rtps::WriteParams wparams;
wparams.related_sample_identity(info.related_sample_identity);
ReturnCode_t ret = requester_writer_->write(data, wparams);
Expand Down Expand Up @@ -117,6 +125,7 @@ ReturnCode_t RequesterImpl::take_reply(
EPROSIMA_LOG_ERROR(REQUESTER, "Trying to take a reply with a disabled requester");
return RETCODE_PRECONDITION_NOT_MET;
}

return requester_reader_->take(data, info);
}

Expand Down Expand Up @@ -245,6 +254,21 @@ ReturnCode_t RequesterImpl::delete_contained_entities()
return RETCODE_OK;
}

bool RequesterImpl::is_fully_matched() const
{
PublicationMatchedStatus pub_status;
SubscriptionMatchedStatus sub_status;

if ((RETCODE_OK == requester_reader_->get_subscription_matched_status(sub_status)) &&
(RETCODE_OK == requester_writer_->get_publication_matched_status(pub_status)))
{
return (pub_status.current_count > 0) && (pub_status.current_count == sub_status.current_count);
}

EPROSIMA_LOG_ERROR(REQUESTER, "Error getting matched subscriptions or publications");
return false;
}

} // namespace rpc
} // namespace dds
} // namespace fastdds
Expand Down
7 changes: 7 additions & 0 deletions src/cpp/fastdds/rpc/RequesterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ class RequesterImpl : public Requester
*/
ReturnCode_t delete_contained_entities();

/**
* @brief Check if the requester is fully matched with a replier
*
* @return true if the requester is fully matched, false otherwise
*/
bool is_fully_matched() const;

DataReader* requester_reader_;
DataWriter* requester_writer_;
RequesterQos qos_;
Expand Down
20 changes: 20 additions & 0 deletions test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <fastdds/dds/rpc/RequestInfo.hpp>
#include <fastdds/dds/rpc/ServiceTypeSupport.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/rtps/common/WriteParams.hpp>

#include "ReqRepHelloWorldRequester.hpp"
#include "ReqRepHelloWorldService.hpp"
Expand Down Expand Up @@ -170,6 +171,25 @@ void ReqRepHelloWorldRequester::matched()
}
}

void ReqRepHelloWorldRequester::direct_send(
const uint16_t number)
{
WriteParams info;
HelloWorld hello;
hello.index(number);
hello.message("HelloWorld");

{
std::unique_lock<std::mutex> lock(mutex_);
current_number_ = number;
}

ASSERT_EQ(requester_->get_requester_writer()->write((void*)&hello, info), RETCODE_OK);
related_sample_identity_ = info.related_sample_identity();

ASSERT_NE(related_sample_identity_.sequence_number(), SequenceNumber_t());
}

void ReqRepHelloWorldRequester::send(
const uint16_t number)
{
Expand Down
8 changes: 8 additions & 0 deletions test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ class ReqRepHelloWorldRequester

void matched();

/**
* Sends a request without checking the matching status.
*
* @param number The number to send.
*/
void direct_send(
const uint16_t number);

void send(
const uint16_t number);

Expand Down
5 changes: 4 additions & 1 deletion test/blackbox/common/BlackboxTestsVolatile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,16 @@ TEST_P(Volatile, AsyncVolatileKeepAllPubReliableSubNonReliableHelloWorld)
// Regression test of Refs #3376, github ros2/rmw_fastrtps #226
TEST_P(Volatile, ReqRepVolatileHelloworldRequesterCheckWriteParams)
{
// Note: this test checks that even when not matched, a valid related_sample_identity is set.
// Since the new requester API yields an error when not matched, direct_send is used instead of send.

ReqRepHelloWorldRequester requester;

requester.init(true);

ASSERT_TRUE(requester.isInitialized());

requester.send(1);
requester.direct_send(1);
}

// Test created to check bug #5423, github ros2/ros2 #703
Expand Down
Loading