Skip to content

Commit 0bc6524

Browse files
morlov-apexaimergify[bot]
authored andcommitted
Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301)
* Gracefully handle SIGINT and SIGTERM signal for rosbag2 recorder - Call recorder->stop() and exec_->cancel() instead of rclcpp::shutdown Signed-off-by: Michael Orlov <[email protected]> * Use singleton for static executor in the rosbag2_py::Recorder Signed-off-by: Michael Orlov <[email protected]> * Rollback to the non-static executor and don't call executor->cancel() - In case when signal will arrive we will trigger our internal exit_ variable and wait while current exec_->spin_all(10msec) will exit. Signed-off-by: Michael Orlov <[email protected]> * Spin recorder node in a separate thread for better handling exit - Run exec->spin() in a separate thread, because we need to call exec->cancel() after recorder->stop() to be able to send notifications about bag split and close. - Wait on conditional variable for exit_ flag Signed-off-by: Michael Orlov <[email protected]> * Address race condition in rosbag2_py.test_record_cancel - Add `record_thread.join()` before trying to parse metadata. Signed-off-by: Michael Orlov <[email protected]> --------- Signed-off-by: Michael Orlov <[email protected]> (cherry picked from commit 46a23e9)
1 parent 33edccf commit 0bc6524

File tree

2 files changed

+40
-12
lines changed

2 files changed

+40
-12
lines changed

rosbag2_py/src/rosbag2_py/_transport.cpp

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -189,17 +189,20 @@ class Player
189189

190190
class Recorder
191191
{
192-
private:
193-
std::unique_ptr<rclcpp::executors::SingleThreadedExecutor> exec_;
194-
195192
public:
196193
Recorder()
197194
{
198-
rclcpp::init(0, nullptr);
199-
exec_ = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
195+
auto init_options = rclcpp::InitOptions();
196+
init_options.shutdown_on_signal = false;
197+
rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None);
198+
200199
std::signal(
201200
SIGTERM, [](int /* signal */) {
202-
rclcpp::shutdown();
201+
rosbag2_py::Recorder::cancel();
202+
});
203+
std::signal(
204+
SIGINT, [](int /* signal */) {
205+
rosbag2_py::Recorder::cancel();
203206
});
204207
}
205208

@@ -213,6 +216,8 @@ class Recorder
213216
RecordOptions & record_options,
214217
std::string & node_name)
215218
{
219+
exit_ = false;
220+
auto exec = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
216221
if (record_options.rmw_serialization_format.empty()) {
217222
record_options.rmw_serialization_format = std::string(rmw_get_serialization_format());
218223
}
@@ -222,20 +227,42 @@ class Recorder
222227
std::move(writer), storage_options, record_options, node_name);
223228
recorder->record();
224229

225-
exec_->add_node(recorder);
226-
// Release the GIL for long-running record, so that calling Python code can use other threads
230+
exec->add_node(recorder);
231+
// Run exec->spin() in a separate thread, because we need to call exec->cancel() after
232+
// recorder->stop() to be able to send notifications about bag split and close.
233+
auto spin_thread = std::thread(
234+
[&exec]() {
235+
exec->spin();
236+
});
227237
{
238+
// Release the GIL for long-running record, so that calling Python code can use other threads
228239
py::gil_scoped_release release;
229-
exec_->spin();
240+
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
241+
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();});
242+
recorder->stop();
243+
}
244+
exec->cancel();
245+
if (spin_thread.joinable()) {
246+
spin_thread.join();
230247
}
248+
exec->remove_node(recorder);
231249
}
232250

233-
void cancel()
251+
static void cancel()
234252
{
235-
exec_->cancel();
253+
exit_ = true;
254+
wait_for_exit_cv_.notify_all();
236255
}
256+
257+
protected:
258+
static std::atomic_bool exit_;
259+
static std::condition_variable wait_for_exit_cv_;
260+
std::mutex wait_for_exit_mutex_;
237261
};
238262

263+
std::atomic_bool Recorder::exit_{false};
264+
std::condition_variable Recorder::wait_for_exit_cv_{};
265+
239266
// Return a RecordOptions struct with defaults set for rewriting bags.
240267
rosbag2_transport::RecordOptions bag_rewrite_default_record_options()
241268
{
@@ -360,7 +387,7 @@ PYBIND11_MODULE(_transport, m) {
360387
.def(
361388
"record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options"),
362389
py::arg("node_name") = "rosbag2_recorder")
363-
.def("cancel", &rosbag2_py::Recorder::cancel)
390+
.def_static("cancel", &rosbag2_py::Recorder::cancel)
364391
;
365392

366393
m.def(

rosbag2_py/test/test_transport.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def test_record_cancel(tmp_path, storage_id):
8080
metadata_io = rosbag2_py.MetadataIo()
8181
assert wait_for(lambda: metadata_io.metadata_file_exists(bag_path),
8282
timeout=rclpy.duration.Duration(seconds=3))
83+
record_thread.join()
8384

8485
metadata = metadata_io.read_metadata(bag_path)
8586
assert(len(metadata.relative_file_paths))

0 commit comments

Comments
 (0)