Skip to content

Commit 48e9d15

Browse files
First step towards modernizing the rt publisher (backport #210) (#233)
Co-authored-by: Lennart Nachtigall <[email protected]>
1 parent e9140b8 commit 48e9d15

File tree

3 files changed

+128
-17
lines changed

3 files changed

+128
-17
lines changed

include/realtime_tools/realtime_publisher.hpp

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,27 +49,39 @@
4949

5050
namespace realtime_tools
5151
{
52-
template <class Msg>
52+
template <class MessageT>
5353
class RealtimePublisher
5454
{
55-
private:
56-
using PublisherSharedPtr = typename rclcpp::Publisher<Msg>::SharedPtr;
57-
5855
public:
56+
/// Provide various typedefs to resemble the rclcpp::Publisher type
57+
using PublisherType = rclcpp::Publisher<MessageT>;
58+
using PublisherSharedPtr = typename rclcpp::Publisher<MessageT>::SharedPtr;
59+
60+
using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
61+
using ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type;
62+
63+
RCLCPP_SMART_PTR_DEFINITIONS(RealtimePublisher<MessageT>)
64+
5965
/// The msg_ variable contains the data that will get published on the ROS topic.
60-
Msg msg_;
66+
MessageT msg_;
6167

6268
/** \brief Constructor for the realtime publisher
6369
*
6470
* \param publisher the publisher to wrap
6571
*/
6672
explicit RealtimePublisher(PublisherSharedPtr publisher)
67-
: publisher_(publisher), is_running_(false), keep_running_(true), turn_(LOOP_NOT_STARTED)
73+
: publisher_(publisher), is_running_(false), keep_running_(true), turn_(State::LOOP_NOT_STARTED)
6874
{
6975
thread_ = std::thread(&RealtimePublisher::publishingLoop, this);
7076
}
7177

72-
RealtimePublisher() : is_running_(false), keep_running_(false), turn_(LOOP_NOT_STARTED) {}
78+
[[deprecated(
79+
"Use constructor with rclcpp::Publisher<T>::SharedPtr instead - this class does not make sense "
80+
"without a real publisher")]]
81+
RealtimePublisher()
82+
: is_running_(false), keep_running_(false), turn_(State::LOOP_NOT_STARTED)
83+
{
84+
}
7385

7486
/// Destructor
7587
~RealtimePublisher()
@@ -101,7 +113,7 @@ class RealtimePublisher
101113
bool trylock()
102114
{
103115
if (msg_mutex_.try_lock()) {
104-
if (turn_ == REALTIME) {
116+
if (turn_ == State::REALTIME) {
105117
return true;
106118
} else {
107119
msg_mutex_.unlock();
@@ -112,6 +124,23 @@ class RealtimePublisher
112124
}
113125
}
114126

127+
/** \brief Try to get the data lock from realtime and publish the given message
128+
*
129+
* Tries to gain unique access to msg_ variable. If this succeeds
130+
* update the msg_ variable and call unlockAndPublish
131+
* @return false in case no lock for the realtime variable could be acquired
132+
*/
133+
bool tryPublish(const MessageT & msg)
134+
{
135+
if (!trylock()) {
136+
return false;
137+
}
138+
139+
msg_ = msg;
140+
unlockAndPublish();
141+
return true;
142+
}
143+
115144
/** \brief Unlock the msg_ variable
116145
*
117146
* After a successful trylock and after the data is written to the mgs_
@@ -120,7 +149,7 @@ class RealtimePublisher
120149
*/
121150
void unlockAndPublish()
122151
{
123-
turn_ = NON_REALTIME;
152+
turn_ = State::NON_REALTIME;
124153
unlock();
125154
}
126155

@@ -163,10 +192,10 @@ class RealtimePublisher
163192
void publishingLoop()
164193
{
165194
is_running_ = true;
166-
turn_ = REALTIME;
195+
turn_ = State::REALTIME;
167196

168197
while (keep_running_) {
169-
Msg outgoing;
198+
MessageT outgoing;
170199

171200
// Locks msg_ and copies it
172201

@@ -176,7 +205,7 @@ class RealtimePublisher
176205
lock();
177206
#endif
178207

179-
while (turn_ != NON_REALTIME && keep_running_) {
208+
while (turn_ != State::NON_REALTIME && keep_running_) {
180209
#ifdef NON_POLLING
181210
updated_cond_.wait(lock_);
182211
#else
@@ -186,7 +215,7 @@ class RealtimePublisher
186215
#endif
187216
}
188217
outgoing = msg_;
189-
turn_ = REALTIME;
218+
turn_ = State::REALTIME;
190219

191220
unlock();
192221

@@ -210,12 +239,12 @@ class RealtimePublisher
210239
std::condition_variable updated_cond_;
211240
#endif
212241

213-
enum { REALTIME, NON_REALTIME, LOOP_NOT_STARTED };
214-
std::atomic<int> turn_; // Who's turn is it to use msg_?
242+
enum class State : int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED };
243+
std::atomic<State> turn_; // Who's turn is it to use msg_?
215244
};
216245

217-
template <class Msg>
218-
using RealtimePublisherSharedPtr = std::shared_ptr<RealtimePublisher<Msg>>;
246+
template <class MessageT>
247+
using RealtimePublisherSharedPtr = std::shared_ptr<RealtimePublisher<MessageT>>;
219248

220249
} // namespace realtime_tools
221250
#endif // REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_

test/realtime_publisher_tests.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,44 @@ TEST(RealtimePublisher, rt_publish)
9191
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
9292
rclcpp::shutdown();
9393
}
94+
95+
TEST(RealtimePublisher, rt_try_publish)
96+
{
97+
rclcpp::init(0, nullptr);
98+
const size_t ATTEMPTS = 10;
99+
const std::chrono::milliseconds DELAY(250);
100+
101+
const char * expected_msg = "Hello World";
102+
auto node = std::make_shared<rclcpp::Node>("construct_move_destruct");
103+
rclcpp::QoS qos(10);
104+
qos.reliable().transient_local();
105+
auto pub = node->create_publisher<StringMsg>("~/rt_publish", qos);
106+
RealtimePublisher<StringMsg> rt_pub(pub);
107+
108+
// try publish a latched message
109+
bool publish_success = false;
110+
for (std::size_t i = 0; i < ATTEMPTS; ++i) {
111+
StringMsg msg;
112+
msg.string_value = expected_msg;
113+
114+
if (rt_pub.tryPublish(msg)) {
115+
publish_success = true;
116+
break;
117+
}
118+
std::this_thread::sleep_for(DELAY);
119+
}
120+
ASSERT_TRUE(publish_success);
121+
122+
// make sure subscriber gets it
123+
StringCallback str_callback;
124+
125+
auto sub = node->create_subscription<StringMsg>(
126+
"~/rt_publish", qos,
127+
std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1));
128+
for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) {
129+
rclcpp::spin_some(node);
130+
std::this_thread::sleep_for(DELAY);
131+
}
132+
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
133+
rclcpp::shutdown();
134+
}

test/realtime_publisher_tests_non_polling.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,44 @@ TEST(RealtimePublisherNonPolling, rt_publish)
9393
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
9494
rclcpp::shutdown();
9595
}
96+
97+
TEST(RealtimePublisher, rt_try_publish)
98+
{
99+
rclcpp::init(0, nullptr);
100+
const size_t ATTEMPTS = 10;
101+
const std::chrono::milliseconds DELAY(250);
102+
103+
const char * expected_msg = "Hello World";
104+
auto node = std::make_shared<rclcpp::Node>("construct_move_destruct");
105+
rclcpp::QoS qos(10);
106+
qos.reliable().transient_local();
107+
auto pub = node->create_publisher<StringMsg>("~/rt_publish", qos);
108+
RealtimePublisher<StringMsg> rt_pub(pub);
109+
110+
// try publish a latched message
111+
bool publish_success = false;
112+
for (std::size_t i = 0; i < ATTEMPTS; ++i) {
113+
StringMsg msg;
114+
msg.string_value = expected_msg;
115+
116+
if (rt_pub.tryPublish(msg)) {
117+
publish_success = true;
118+
break;
119+
}
120+
std::this_thread::sleep_for(DELAY);
121+
}
122+
ASSERT_TRUE(publish_success);
123+
124+
// make sure subscriber gets it
125+
StringCallback str_callback;
126+
127+
auto sub = node->create_subscription<StringMsg>(
128+
"~/rt_publish", qos,
129+
std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1));
130+
for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) {
131+
rclcpp::spin_some(node);
132+
std::this_thread::sleep_for(DELAY);
133+
}
134+
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
135+
rclcpp::shutdown();
136+
}

0 commit comments

Comments
 (0)