Skip to content

Commit a962cba

Browse files
Improvements on RPC entities (#5801)
* Refs #23153. Add checks for full match in RequesterImpl. Signed-off-by: Miguel Company <[email protected]> * Refs #23153. Add checks for full match in ReplierImpl. Signed-off-by: Miguel Company <[email protected]> * Refs #23153. Improve default values on ReplierQos and RequesterQos. Signed-off-by: Miguel Company <[email protected]> * Refs #23153. Apply review on Replier. Signed-off-by: Miguel Company <[email protected]> * Refs #23153. Apply review on Requester. Signed-off-by: Miguel Company <[email protected]> * Refs #23153. Fix failing test. Signed-off-by: Miguel Company <[email protected]> --------- Signed-off-by: Miguel Company <[email protected]>
1 parent e7510c9 commit a962cba

File tree

11 files changed

+179
-6
lines changed

11 files changed

+179
-6
lines changed

include/fastdds/dds/domain/qos/ReplierQos.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ class ReplierQos
4141
// Set reliability to RELIABLE_RELIABILITY_QOS by default
4242
writer_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
4343
reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
44+
45+
// Set durability to VOLATILE_DURABILITY_QOS by default
46+
writer_qos.durability().kind = VOLATILE_DURABILITY_QOS;
47+
reader_qos.durability().kind = VOLATILE_DURABILITY_QOS;
48+
49+
// Set history to KEEP_ALL_HISTORY_QOS by default
50+
writer_qos.history().kind = KEEP_ALL_HISTORY_QOS;
51+
reader_qos.history().kind = KEEP_ALL_HISTORY_QOS;
4452
}
4553

4654
/**

include/fastdds/dds/domain/qos/RequesterQos.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ class RequesterQos
4141
// Set reliability to RELIABLE_RELIABILITY_QOS by default
4242
writer_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
4343
reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
44+
45+
// Set durability to VOLATILE_DURABILITY_QOS by default
46+
writer_qos.durability().kind = VOLATILE_DURABILITY_QOS;
47+
reader_qos.durability().kind = VOLATILE_DURABILITY_QOS;
48+
49+
// Set history to KEEP_ALL_HISTORY_QOS by default
50+
writer_qos.history().kind = KEEP_ALL_HISTORY_QOS;
51+
reader_qos.history().kind = KEEP_ALL_HISTORY_QOS;
4452
}
4553

4654
/**

include/fastdds/dds/rpc/Replier.hpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,12 @@ class Replier : public RPCEntity
5050
*
5151
* @param data Data to send
5252
* @param info Information about the reply sample. This information is used to match the reply with the request through the SampleIdentity
53-
* @return RETCODE_OK if the reply was sent successfully or a ReturnCode related to the specific error otherwise
53+
*
54+
* @return RETCODE_OK if the reply was sent successfully
55+
* @return RETCODE_PRECONDITION_NOT_MET if the replier is not enabled
56+
* @return RETCODE_NO_DATA if the requester that sent the request has disconnected (this usually means that the reply can be dropped)
57+
* @return RETCODE_TIMEOUT if waiting for the requester to be fully matched timed out
58+
* @return a ReturnCode from the underlying DataWriter
5459
*/
5560
virtual ReturnCode_t send_reply(
5661
void* data,

include/fastdds/dds/rpc/Requester.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ class Requester : public RPCEntity
5050
*
5151
* @param data Data to send
5252
* @param info Information about the request sample. This information is used to match the request with the reply through the SampleIdentity
53-
* @return RETCODE_OK if the reply was sent successfully or a ReturnCode related to the specific error otherwise
53+
*
54+
* @return RETCODE_OK if the reply was sent successfully
55+
* @return RETCODE_PRECONDITION_NOT_MET if the requester is not enabled or it is not fully matched
56+
* @return a ReturnCode from the underlying DataWriter
5457
*/
5558
virtual ReturnCode_t send_request(
5659
void* data,

src/cpp/fastdds/rpc/ReplierImpl.cpp

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616

1717
#include <string>
1818

19+
#include <fastdds/dds/builtin/topic/PublicationBuiltinTopicData.hpp>
20+
#include <fastdds/dds/builtin/topic/SubscriptionBuiltinTopicData.hpp>
1921
#include <fastdds/dds/core/detail/DDSReturnCode.hpp>
2022
#include <fastdds/dds/core/LoanableCollection.hpp>
2123
#include <fastdds/dds/core/LoanableSequence.hpp>
24+
#include <fastdds/dds/core/status/PublicationMatchedStatus.hpp>
2225
#include <fastdds/dds/core/status/StatusMask.hpp>
26+
#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp>
2327
#include <fastdds/dds/domain/qos/ReplierQos.hpp>
2428
#include <fastdds/dds/log/Log.hpp>
2529
#include <fastdds/dds/rpc/RequestInfo.hpp>
@@ -92,14 +96,27 @@ ReturnCode_t ReplierImpl::send_reply(
9296
void* data,
9397
const RequestInfo& info)
9498
{
95-
FASTDDS_TODO_BEFORE(3, 3, "Implement matching algorithm");
96-
9799
if (!enabled_)
98100
{
99101
EPROSIMA_LOG_ERROR(REPLIER, "Trying to send a reply with a disabled replier");
100102
return RETCODE_PRECONDITION_NOT_MET;
101103
}
102104

105+
FASTDDS_TODO_BEFORE(3, 3, "Wait for the requester to be fully matched");
106+
auto match_status = requester_match_status(info);
107+
if (RequesterMatchStatus::UNMATCHED == match_status)
108+
{
109+
// The writer that sent the request has been unmatched.
110+
EPROSIMA_LOG_WARNING(REPLIER, "Trying to send a reply to a disconnected requester");
111+
return RETCODE_NO_DATA;
112+
}
113+
else if (RequesterMatchStatus::PARTIALLY_MATCHED == match_status)
114+
{
115+
// The writer that sent the request is still matched, but the reply topic is not.
116+
EPROSIMA_LOG_WARNING(REPLIER, "Trying to send a reply to a partially matched requester");
117+
return RETCODE_TIMEOUT;
118+
}
119+
103120
rtps::WriteParams wparams;
104121
wparams.related_sample_identity(info.related_sample_identity);
105122

@@ -275,7 +292,55 @@ ReturnCode_t ReplierImpl::delete_contained_entities()
275292
return RETCODE_OK;
276293
}
277294

295+
ReplierImpl::RequesterMatchStatus ReplierImpl::requester_match_status(
296+
const RequestInfo& info) const
297+
{
298+
// Check if the replier is still matched with the requester in the request topic
299+
PublicationBuiltinTopicData pub_data;
300+
if (RETCODE_OK != replier_reader_->get_matched_publication_data(pub_data, info.sample_identity.writer_guid()))
301+
{
302+
return RequesterMatchStatus::UNMATCHED;
303+
}
304+
305+
FASTDDS_TODO_BEFORE(3, 3, "Get reply reader GUID from pub_data");
306+
307+
auto related_guid = info.related_sample_identity.writer_guid();
308+
bool reply_topic_matched = false;
309+
if (info.sample_identity.writer_guid() != related_guid)
310+
{
311+
// Custom related GUID (i.e. reply reader GUID) sent with the request.
312+
// Check if the replier writer is matched with that specific reader.
313+
SubscriptionBuiltinTopicData sub_data;
314+
reply_topic_matched = RETCODE_OK == replier_writer_->get_matched_subscription_data(sub_data, related_guid);
315+
}
316+
else
317+
{
318+
// No custom related GUID, check if the replier is matched with all the requesters
319+
// (same number of matched readers and writers)
320+
reply_topic_matched = is_fully_matched();
321+
}
322+
323+
return reply_topic_matched ?
324+
RequesterMatchStatus::MATCHED :
325+
RequesterMatchStatus::PARTIALLY_MATCHED;
326+
}
327+
328+
bool ReplierImpl::is_fully_matched() const
329+
{
330+
PublicationMatchedStatus pub_status;
331+
SubscriptionMatchedStatus sub_status;
332+
333+
if ((RETCODE_OK == replier_reader_->get_subscription_matched_status(sub_status)) &&
334+
(RETCODE_OK == replier_writer_->get_publication_matched_status(pub_status)))
335+
{
336+
return (pub_status.current_count > 0) && (pub_status.current_count == sub_status.current_count);
337+
}
338+
339+
EPROSIMA_LOG_ERROR(REPLIER, "Error getting matched subscriptions or publications");
340+
return false;
341+
}
342+
278343
} // namespace rpc
279344
} // namespace dds
280345
} // namespace fastdds
281-
} // namespace eprosima
346+
} // namespace eprosima

src/cpp/fastdds/rpc/ReplierImpl.hpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,28 @@ class ReplierImpl : public Replier
156156
*/
157157
ReturnCode_t delete_contained_entities();
158158

159+
/**
160+
* @brief Possible states of the replier with respect to a requester
161+
*/
162+
enum class RequesterMatchStatus
163+
{
164+
UNMATCHED, // Request topic not matched
165+
PARTIALLY_MATCHED, // Request topic matched but Reply topic not matched
166+
MATCHED // Both topics matched
167+
};
168+
169+
/**
170+
* @brief Check the matched status of the replier with respect to a requester
171+
*
172+
* @param info Information about the request for which to check the status
173+
*
174+
* @return The matched status of the replier with respect to the requester that sent the request
175+
*/
176+
RequesterMatchStatus requester_match_status(
177+
const RequestInfo& info) const;
178+
179+
bool is_fully_matched() const;
180+
159181
DataReader* replier_reader_;
160182
DataWriter* replier_writer_;
161183
ReplierQos qos_;

src/cpp/fastdds/rpc/RequesterImpl.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
#include <fastdds/dds/core/detail/DDSReturnCode.hpp>
2020
#include <fastdds/dds/core/LoanableCollection.hpp>
2121
#include <fastdds/dds/core/LoanableSequence.hpp>
22+
#include <fastdds/dds/core/status/PublicationMatchedStatus.hpp>
2223
#include <fastdds/dds/core/status/StatusMask.hpp>
24+
#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp>
2325
#include <fastdds/dds/domain/qos/RequesterQos.hpp>
2426
#include <fastdds/dds/log/Log.hpp>
2527
#include <fastdds/dds/rpc/RequestInfo.hpp>
@@ -68,6 +70,12 @@ ReturnCode_t RequesterImpl::send_request(
6870
return RETCODE_PRECONDITION_NOT_MET;
6971
}
7072

73+
if (!is_fully_matched())
74+
{
75+
EPROSIMA_LOG_WARNING(REQUESTER, "Trying to send a request with an unmatched requester");
76+
return RETCODE_PRECONDITION_NOT_MET;
77+
}
78+
7179
rtps::WriteParams wparams;
7280
wparams.related_sample_identity(info.related_sample_identity);
7381
ReturnCode_t ret = requester_writer_->write(data, wparams);
@@ -117,6 +125,7 @@ ReturnCode_t RequesterImpl::take_reply(
117125
EPROSIMA_LOG_ERROR(REQUESTER, "Trying to take a reply with a disabled requester");
118126
return RETCODE_PRECONDITION_NOT_MET;
119127
}
128+
120129
return requester_reader_->take(data, info);
121130
}
122131

@@ -245,6 +254,21 @@ ReturnCode_t RequesterImpl::delete_contained_entities()
245254
return RETCODE_OK;
246255
}
247256

257+
bool RequesterImpl::is_fully_matched() const
258+
{
259+
PublicationMatchedStatus pub_status;
260+
SubscriptionMatchedStatus sub_status;
261+
262+
if ((RETCODE_OK == requester_reader_->get_subscription_matched_status(sub_status)) &&
263+
(RETCODE_OK == requester_writer_->get_publication_matched_status(pub_status)))
264+
{
265+
return (pub_status.current_count > 0) && (pub_status.current_count == sub_status.current_count);
266+
}
267+
268+
EPROSIMA_LOG_ERROR(REQUESTER, "Error getting matched subscriptions or publications");
269+
return false;
270+
}
271+
248272
} // namespace rpc
249273
} // namespace dds
250274
} // namespace fastdds

src/cpp/fastdds/rpc/RequesterImpl.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,13 @@ class RequesterImpl : public Requester
156156
*/
157157
ReturnCode_t delete_contained_entities();
158158

159+
/**
160+
* @brief Check if the requester is fully matched with a replier
161+
*
162+
* @return true if the requester is fully matched, false otherwise
163+
*/
164+
bool is_fully_matched() const;
165+
159166
DataReader* requester_reader_;
160167
DataWriter* requester_writer_;
161168
RequesterQos qos_;

test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <fastdds/dds/rpc/RequestInfo.hpp>
2929
#include <fastdds/dds/rpc/ServiceTypeSupport.hpp>
3030
#include <fastdds/dds/subscriber/DataReader.hpp>
31+
#include <fastdds/rtps/common/WriteParams.hpp>
3132

3233
#include "ReqRepHelloWorldRequester.hpp"
3334
#include "ReqRepHelloWorldService.hpp"
@@ -170,6 +171,25 @@ void ReqRepHelloWorldRequester::matched()
170171
}
171172
}
172173

174+
void ReqRepHelloWorldRequester::direct_send(
175+
const uint16_t number)
176+
{
177+
WriteParams info;
178+
HelloWorld hello;
179+
hello.index(number);
180+
hello.message("HelloWorld");
181+
182+
{
183+
std::unique_lock<std::mutex> lock(mutex_);
184+
current_number_ = number;
185+
}
186+
187+
ASSERT_EQ(requester_->get_requester_writer()->write((void*)&hello, info), RETCODE_OK);
188+
related_sample_identity_ = info.related_sample_identity();
189+
190+
ASSERT_NE(related_sample_identity_.sequence_number(), SequenceNumber_t());
191+
}
192+
173193
void ReqRepHelloWorldRequester::send(
174194
const uint16_t number)
175195
{

test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,14 @@ class ReqRepHelloWorldRequester
7878

7979
void matched();
8080

81+
/**
82+
* Sends a request without checking the matching status.
83+
*
84+
* @param number The number to send.
85+
*/
86+
void direct_send(
87+
const uint16_t number);
88+
8189
void send(
8290
const uint16_t number);
8391

test/blackbox/common/BlackboxTestsVolatile.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,16 @@ TEST_P(Volatile, AsyncVolatileKeepAllPubReliableSubNonReliableHelloWorld)
250250
// Regression test of Refs #3376, github ros2/rmw_fastrtps #226
251251
TEST_P(Volatile, ReqRepVolatileHelloworldRequesterCheckWriteParams)
252252
{
253+
// Note: this test checks that even when not matched, a valid related_sample_identity is set.
254+
// Since the new requester API yields an error when not matched, direct_send is used instead of send.
255+
253256
ReqRepHelloWorldRequester requester;
254257

255258
requester.init(true);
256259

257260
ASSERT_TRUE(requester.isInitialized());
258261

259-
requester.send(1);
262+
requester.direct_send(1);
260263
}
261264

262265
// Test created to check bug #5423, github ros2/ros2 #703

0 commit comments

Comments
 (0)