Skip to content

Commit d20f755

Browse files
authored
Add a direct API for rosbag2_transport::Recorder (#2221)
* Add a direct API for rosbag2_transport::Recorder Add direct recorder API add_channel(..) and write_message(..) Add direct write_message(..) API with recv_timestamp Add direct `on_messages_lost_in_transport(..)` API Note: The `on_messages_lost_in_transport(..)` method is expected to be called when messages are lost in the transport layer. Also added `get_total_num_messages_lost_in_transport()` getter method. Signed-off-by: Michael Orlov <[email protected]> * Add RecorderImpl::get_total_num_messages_lost_in_transport() Signed-off-by: Michael Orlov <[email protected]> --------- Signed-off-by: Michael Orlov <[email protected]>
1 parent 572f98e commit d20f755

File tree

3 files changed

+577
-1
lines changed

3 files changed

+577
-1
lines changed

rosbag2_transport/include/rosbag2_transport/recorder.hpp

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,102 @@ class Recorder : public rclcpp::Node
117117
ROSBAG2_TRANSPORT_PUBLIC
118118
virtual ~Recorder();
119119

120+
/// @brief Start recording.
121+
/// The record() method will return almost immediately and recording will happen in background.
120122
ROSBAG2_TRANSPORT_PUBLIC
121123
void record();
122124

125+
/// @brief Add a new channel (topic) to the rosbag2 writer to be recorded.
126+
/// \details This is a direct Recorder API equivalent to the rosbag2_cpp::Writer::add_topic().
127+
/// \note This method does not require the message definition. The recorder will try to find the
128+
/// corresponding message definition by the given topic name and type.
129+
/// @param topic_name The name of the topic.
130+
/// @param topic_type The type of the topic.
131+
/// @param serialization_format The serialization format of the topic.
132+
/// @param type_description_hash REP-2011 type description hash of the topic.
133+
/// @param offered_qos_profiles The list of offered QoS profiles for the topic if available.
134+
ROSBAG2_TRANSPORT_PUBLIC
135+
void add_channel(
136+
const std::string & topic_name,
137+
const std::string & topic_type,
138+
const std::string & serialization_format = "memory_view",
139+
const std::string & type_description_hash = "",
140+
const std::vector<rclcpp::QoS> & offered_qos_profiles = {});
141+
142+
/// \brief Add a new channel (topic) to the rosbag2 writer to be recorded.
143+
/// \details This is a direct Recorder API equivalent to the rosbag2_cpp::Writer::add_topic().
144+
/// \param topic_name The name of the topic.
145+
/// \param topic_type The type of the topic.
146+
/// \param message_definition_encoding The encoding technique used in
147+
/// the `encoded_message_definition` e.g. "ros2idl", "ros2msg", "apex_json" or "unknown" if
148+
/// encoded_message_definition is empty.
149+
/// \param encoded_message_definition The fully encoded message definition for this type.
150+
/// \param serialization_format The serialization format of the topic.
151+
/// \param type_description_hash REP-2011 type description hash of the topic.
152+
/// \param offered_qos_profiles The list of offered QoS profiles for the topic if available.
153+
ROSBAG2_TRANSPORT_PUBLIC
154+
void add_channel(
155+
const std::string & topic_name,
156+
const std::string & topic_type,
157+
const std::string & message_definition_encoding,
158+
const std::string & encoded_message_definition,
159+
const std::string & serialization_format = "memory_view",
160+
const std::string & type_description_hash = "",
161+
const std::vector<rclcpp::QoS> & offered_qos_profiles = {});
162+
163+
/// @brief Write a serialized message to the bag file.
164+
/// \details This is a direct Recorder API equivalent to the rosbag2_cpp::Writer::write().
165+
/// \note This method assumes that the topic has already been created via add_channel().
166+
/// \note If recorder is in pause mode, this method will return without writing anything.
167+
/// @param serialized_data The serialized message data to write.
168+
/// @param topic_name The name of the topic the message belongs to.
169+
/// @param pub_timestamp The original or publication timestamp of the message.
170+
/// @param sequence_number An optional sequence number of the message. If non-zero, sequence
171+
/// numbers should be unique per channel and increasing over time.
172+
/// \throws std::runtime_error if topic has not been added via add_channel().
173+
ROSBAG2_TRANSPORT_PUBLIC
174+
void write_message(
175+
std::shared_ptr<rcutils_uint8_array_t> serialized_data,
176+
const std::string & topic_name,
177+
const rcutils_time_point_value_t & pub_timestamp,
178+
uint32_t sequence_number = 0);
179+
180+
/// @brief Write a serialized message to the bag file with receive timestamp.
181+
/// \details This is a direct Recorder API equivalent to the rosbag2_cpp::Writer::write().
182+
/// \note This method assumes that the topic has already been created via add_channel().
183+
/// \note If recorder is in pause mode, this method will return without writing anything.
184+
/// @param serialized_data The serialized message data to write.
185+
/// @param topic_name The name of the topic the message belongs to.
186+
/// @param pub_timestamp The original or publication timestamp of the message in nanoseconds.
187+
/// @param recv_timestamp The timestamp of the message in nanoseconds when message was received.
188+
/// @param sequence_number An optional sequence number of the message. If non-zero, sequence
189+
/// numbers should be unique per channel and increasing over time.
190+
/// \throws std::runtime_error if topic has not been added via add_channel().
191+
ROSBAG2_TRANSPORT_PUBLIC
192+
void write_message(
193+
std::shared_ptr<rcutils_uint8_array_t> serialized_data,
194+
const std::string & topic_name,
195+
const rcutils_time_point_value_t & pub_timestamp,
196+
const rcutils_time_point_value_t & recv_timestamp,
197+
uint32_t sequence_number = 0);
198+
199+
/// @brief Updates recorder about lost messages on transport layer.
200+
/// @details This a direct recorder API and this method is expected to be called when messages
201+
/// are lost in the transport layer.
202+
/// The recorder may use this information for logging or metrics.
203+
/// @param topic_name The name of the topic.
204+
/// @param qos_msgs_lost_info Information about lost messages.
205+
ROSBAG2_TRANSPORT_PUBLIC
206+
void on_messages_lost_in_transport(
207+
const std::string & topic_name,
208+
const rclcpp::QOSMessageLostInfo & qos_msgs_lost_info);
209+
210+
/// @brief Get total number of messages lost in transport layer.
211+
/// @return Total number of messages lost in transport layer.
212+
[[nodiscard]]
213+
ROSBAG2_TRANSPORT_PUBLIC
214+
uint64_t get_total_num_messages_lost_in_transport() const;
215+
123216
/// @brief Stopping recording.
124217
/// @details The stop() is opposite to the record() operation. It will stop recording, dump
125218
/// all buffers to the disk and close writer. The record() can be called again after stop().

rosbag2_transport/src/rosbag2_transport/recorder.cpp

Lines changed: 237 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,103 @@ class RecorderImpl
6363

6464
~RecorderImpl();
6565

66+
/// @brief Start recording.
67+
/// The record() method will return almost immediately and recording will happen in background.
6668
void record();
6769

70+
/// @brief Add a new channel (topic) to the rosbag2 writer to be recorded.
71+
/// \details This is a direct Recorder API equivalent to the rosbag2_cpp::Writer::add_topic().
72+
/// \note This method does not require the message definition. The recorder will try to find the
73+
/// corresponding message definition by the given topic name and type.
74+
/// @param topic_name The name of the topic.
75+
/// @param topic_type The type of the topic.
76+
/// @param serialization_format The serialization format of the topic.
77+
/// @param type_description_hash REP-2011 type description hash of the topic.
78+
/// @param offered_qos_profiles The list of offered QoS profiles for the topic.
79+
void add_channel(
80+
const std::string & topic_name,
81+
const std::string & topic_type,
82+
const std::string & serialization_format = "memory_view",
83+
const std::string & type_description_hash = "",
84+
const std::vector<rclcpp::QoS> & offered_qos_profiles = {});
85+
86+
/// \brief Add a new channel (topic) to the rosbag2 writer to be recorded.
87+
/// \details This is a direct Recorder API equivalent to the rosbag2_cpp::Writer::add_topic().
88+
/// \param topic_name The name of the topic.
89+
/// \param topic_type The type of the topic.
90+
/// \param message_definition_encoding The encoding technique used in
91+
/// the `encoded_message_definition` e.g. "ros2idl", "ros2msg", "apex_json" or "unknown" if
92+
/// encoded_message_definition is empty.
93+
/// \param encoded_message_definition The fully encoded message definition for this type.
94+
/// \param serialization_format The serialization format of the topic.
95+
/// \param type_description_hash REP-2011 type description hash of the topic.
96+
/// \param offered_qos_profiles The list of offered QoS profiles for the topic.
97+
void add_channel(
98+
const std::string & topic_name,
99+
const std::string & topic_type,
100+
const std::string & message_definition_encoding,
101+
const std::string & encoded_message_definition,
102+
const std::string & serialization_format = "memory_view",
103+
const std::string & type_description_hash = "",
104+
const std::vector<rclcpp::QoS> & offered_qos_profiles = {});
105+
106+
/// @brief Write a serialized message to the bag file.
107+
/// \details This is a direct Recorder API equivalent to the rosbag2_cpp::Writer::write().
108+
/// \note This method assumes that the topic has already been created via add_channel().
109+
/// \note If recorder is in pause mode, this method will return without writing anything.
110+
/// \note This overload uses only publication timestamp. The received timestamp will be taken
111+
/// from the underlying node's clock at the time of writing.
112+
/// @param serialized_data The serialized message data to write.
113+
/// @param topic_name The name of the topic the message belongs to.
114+
/// @param pub_timestamp The original or publication timestamp of the message in nanoseconds.
115+
/// @param sequence_number An optional sequence number of the message. If non-zero, sequence
116+
/// numbers should be unique per channel and increasing over time.
117+
/// \throws std::runtime_error if topic has not been added via add_channel().
118+
void write_message(
119+
std::shared_ptr<rcutils_uint8_array_t> serialized_data,
120+
const std::string & topic_name,
121+
const rcutils_time_point_value_t & pub_timestamp,
122+
uint32_t sequence_number = 0);
123+
124+
/// @brief Write a serialized message to the bag file with receive timestamp.
125+
/// \details This is a direct Recorder API equivalent to the rosbag2_cpp::Writer::write().
126+
/// \note This method assumes that the topic has already been created via add_channel().
127+
/// \note If recorder is in pause mode, this method will return without writing anything.
128+
/// @param serialized_data The serialized message data to write.
129+
/// @param topic_name The name of the topic the message belongs to.
130+
/// @param pub_timestamp The original or publication timestamp of the message in nanoseconds.
131+
/// @param recv_timestamp The timestamp of the message in nanoseconds when message was received.
132+
/// @param sequence_number An optional sequence number of the message. If non-zero, sequence
133+
/// numbers should be unique per channel and increasing over time.
134+
/// \throws std::runtime_error if topic has not been added via add_channel().
135+
void write_message(
136+
std::shared_ptr<rcutils_uint8_array_t> serialized_data,
137+
const std::string & topic_name,
138+
const rcutils_time_point_value_t & pub_timestamp,
139+
const rcutils_time_point_value_t & recv_timestamp,
140+
uint32_t sequence_number = 0);
141+
142+
143+
/// @brief Updates recorder about lost messages on transport layer.
144+
/// @details This a direct recorder API and this method is expected to be called when messages
145+
/// are lost in the transport layer.
146+
/// The recorder may use this information for logging or metrics.
147+
/// @param topic_name The name of the topic.
148+
/// @param qos_msgs_lost_info Information about lost messages.
149+
void on_messages_lost_in_transport(
150+
const std::string & topic_name,
151+
const rclcpp::QOSMessageLostInfo & qos_msgs_lost_info);
152+
153+
/// @brief Get total number of messages lost in transport layer.
154+
/// @return Total number of messages lost in transport layer.
155+
[[nodiscard]]
156+
uint64_t get_total_num_messages_lost_in_transport() const;
157+
68158
/// @brief Stopping recording and closing writer.
69159
/// The record() can be called again after stop().
70160
void stop();
71161

162+
/// Get a const reference to the underlying rosbag2 writer.
72163
const rosbag2_cpp::Writer & get_writer_handle();
73164

74165
/// Pause the recording.
@@ -545,6 +636,94 @@ void RecorderImpl::subscribe_topics(
545636
}
546637
}
547638

639+
void RecorderImpl::add_channel(
640+
const std::string & topic_name,
641+
const std::string & topic_type,
642+
const std::string & serialization_format,
643+
const std::string & type_description_hash,
644+
const std::vector<rclcpp::QoS> & offered_qos_profiles)
645+
{
646+
rosbag2_storage::TopicMetadata topic_with_type{
647+
0u,
648+
topic_name,
649+
topic_type,
650+
serialization_format,
651+
offered_qos_profiles,
652+
type_description_hash,
653+
};
654+
writer_->create_topic(topic_with_type);
655+
}
656+
657+
void RecorderImpl::add_channel(
658+
const std::string & topic_name,
659+
const std::string & topic_type,
660+
const std::string & message_definition_encoding,
661+
const std::string & encoded_message_definition,
662+
const std::string & serialization_format,
663+
const std::string & type_description_hash,
664+
const std::vector<rclcpp::QoS> & offered_qos_profiles)
665+
{
666+
rosbag2_storage::TopicMetadata topic_with_type{
667+
0u,
668+
topic_name,
669+
topic_type,
670+
serialization_format,
671+
offered_qos_profiles,
672+
type_description_hash,
673+
};
674+
rosbag2_storage::MessageDefinition message_definition{
675+
topic_type,
676+
message_definition_encoding,
677+
encoded_message_definition,
678+
type_description_hash
679+
};
680+
writer_->create_topic(topic_with_type, message_definition);
681+
}
682+
683+
void RecorderImpl::write_message(
684+
std::shared_ptr<rcutils_uint8_array_t> serialized_data,
685+
const std::string & topic_name,
686+
const rcutils_time_point_value_t & pub_timestamp,
687+
uint32_t sequence_number)
688+
{
689+
write_message(
690+
std::move(serialized_data),
691+
topic_name,
692+
pub_timestamp,
693+
node->now().nanoseconds(),
694+
sequence_number);
695+
}
696+
697+
void RecorderImpl::write_message(
698+
std::shared_ptr<rcutils_uint8_array_t> serialized_data,
699+
const std::string & topic_name,
700+
const rcutils_time_point_value_t & pub_timestamp,
701+
const rcutils_time_point_value_t & recv_timestamp,
702+
uint32_t sequence_number)
703+
{
704+
if (!paused_.load()) {
705+
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
706+
bag_message->serialized_data = std::move(serialized_data);
707+
bag_message->topic_name = topic_name;
708+
bag_message->recv_timestamp = recv_timestamp;
709+
bag_message->send_timestamp = pub_timestamp;
710+
bag_message->sequence_number = sequence_number;
711+
writer_->write(bag_message);
712+
}
713+
}
714+
715+
void RecorderImpl::on_messages_lost_in_transport(
716+
const std::string & topic_name,
717+
const rclcpp::QOSMessageLostInfo & qos_msgs_lost_info)
718+
{
719+
event_notifier_->on_messages_lost_in_transport(topic_name, qos_msgs_lost_info);
720+
}
721+
722+
uint64_t RecorderImpl::get_total_num_messages_lost_in_transport() const
723+
{
724+
return event_notifier_->get_total_num_messages_lost_in_transport();
725+
}
726+
548727
void RecorderImpl::subscribe_topic(const rosbag2_storage::TopicMetadata & topic)
549728
{
550729
if (subscriptions_.find(topic.name) != subscriptions_.end()) {
@@ -579,7 +758,7 @@ RecorderImpl::create_subscription(
579758
rclcpp::SubscriptionOptions sub_options;
580759
sub_options.event_callbacks.message_lost_callback =
581760
[this, topic_name](const rclcpp::QOSMessageLostInfo & msgs_lost_info) {
582-
this->event_notifier_->on_messages_lost_in_transport(topic_name, msgs_lost_info);
761+
on_messages_lost_in_transport(topic_name, msgs_lost_info);
583762
};
584763

585764
#ifdef _WIN32
@@ -819,6 +998,63 @@ void Recorder::record()
819998
pimpl_->record();
820999
}
8211000

1001+
void Recorder::add_channel(
1002+
const std::string & topic_name,
1003+
const std::string & topic_type,
1004+
const std::string & serialization_format,
1005+
const std::string & type_description_hash,
1006+
const std::vector<rclcpp::QoS> & offered_qos_profiles)
1007+
{
1008+
pimpl_->add_channel(
1009+
topic_name, topic_type, serialization_format, type_description_hash, offered_qos_profiles);
1010+
}
1011+
1012+
void Recorder::add_channel(
1013+
const std::string & topic_name,
1014+
const std::string & topic_type,
1015+
const std::string & message_definition_encoding,
1016+
const std::string & encoded_message_definition,
1017+
const std::string & serialization_format,
1018+
const std::string & type_description_hash,
1019+
const std::vector<rclcpp::QoS> & offered_qos_profiles)
1020+
{
1021+
pimpl_->add_channel(
1022+
topic_name, topic_type, message_definition_encoding, encoded_message_definition,
1023+
serialization_format, type_description_hash, offered_qos_profiles);
1024+
}
1025+
1026+
void Recorder::write_message(
1027+
std::shared_ptr<rcutils_uint8_array_t> serialized_data,
1028+
const std::string & topic_name,
1029+
const rcutils_time_point_value_t & pub_timestamp,
1030+
uint32_t sequence_number)
1031+
{
1032+
pimpl_->write_message(
1033+
std::move(serialized_data), topic_name, pub_timestamp, sequence_number);
1034+
}
1035+
1036+
void Recorder::write_message(
1037+
std::shared_ptr<rcutils_uint8_array_t> serialized_data,
1038+
const std::string & topic_name,
1039+
const rcutils_time_point_value_t & pub_timestamp,
1040+
const rcutils_time_point_value_t & recv_timestamp,
1041+
uint32_t sequence_number)
1042+
{
1043+
pimpl_->write_message(
1044+
std::move(serialized_data), topic_name, pub_timestamp, recv_timestamp, sequence_number);
1045+
}
1046+
1047+
void Recorder::on_messages_lost_in_transport(
1048+
const std::string & topic_name, const rclcpp::QOSMessageLostInfo & qos_msgs_lost_info)
1049+
{
1050+
pimpl_->on_messages_lost_in_transport(topic_name, qos_msgs_lost_info);
1051+
}
1052+
1053+
uint64_t Recorder::get_total_num_messages_lost_in_transport() const
1054+
{
1055+
return pimpl_->get_total_num_messages_lost_in_transport();
1056+
}
1057+
8221058
void Recorder::stop()
8231059
{
8241060
pimpl_->stop();

0 commit comments

Comments
 (0)