Skip to content

Commit e97cf9e

Browse files
[humble] Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (backport #1301) (#1395)
* Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301) * Gracefully handle SIGINT and SIGTERM signal for rosbag2 recorder - Call recorder->stop() and exec_->cancel() instead of rclcpp::shutdown Signed-off-by: Michael Orlov <[email protected]> * Use singleton for static executor in the rosbag2_py::Recorder Signed-off-by: Michael Orlov <[email protected]> * Rollback to the non-static executor and don't call executor->cancel() - In case when signal will arrive we will trigger our internal exit_ variable and wait while current exec_->spin_all(10msec) will exit. Signed-off-by: Michael Orlov <[email protected]> * Spin recorder node in a separate thread for better handling exit - Run exec->spin() in a separate thread, because we need to call exec->cancel() after recorder->stop() to be able to send notifications about bag split and close. - Wait on conditional variable for exit_ flag Signed-off-by: Michael Orlov <[email protected]> * Address race condition in rosbag2_py.test_record_cancel - Add `record_thread.join()` before trying to parse metadata. Signed-off-by: Michael Orlov <[email protected]> --------- Signed-off-by: Michael Orlov <[email protected]> (cherry picked from commit 46a23e9) # Conflicts: # rosbag2_py/src/rosbag2_py/_transport.cpp # rosbag2_py/test/test_transport.py * Address merge conflicts Signed-off-by: Michael Orlov <[email protected]> * Workaround for segmentation fault in rclcpp::SignalHandler::uninstall() Signed-off-by: Michael Orlov <[email protected]> --------- Signed-off-by: Michael Orlov <[email protected]> Co-authored-by: Michael Orlov <[email protected]>
1 parent 6ceed91 commit e97cf9e

File tree

2 files changed

+44
-13
lines changed

2 files changed

+44
-13
lines changed

rosbag2_py/src/rosbag2_py/_transport.cpp

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -141,29 +141,36 @@ class Player
141141

142142
class Recorder
143143
{
144-
private:
145-
std::unique_ptr<rclcpp::executors::SingleThreadedExecutor> exec_;
146-
147144
public:
148145
Recorder()
149146
{
150-
rclcpp::init(0, nullptr);
151-
exec_ = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
147+
auto init_options = rclcpp::InitOptions();
148+
init_options.shutdown_on_signal = false;
149+
rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::SigInt);
150+
rclcpp::uninstall_signal_handlers();
151+
152152
std::signal(
153153
SIGTERM, [](int /* signal */) {
154-
rclcpp::shutdown();
154+
rosbag2_py::Recorder::cancel();
155+
});
156+
std::signal(
157+
SIGINT, [](int /* signal */) {
158+
rosbag2_py::Recorder::cancel();
155159
});
156160
}
157161

158162
virtual ~Recorder()
159163
{
164+
// rclcpp::install_signal_handlers(rclcpp::SignalHandlerOptions::SigTerm);
160165
rclcpp::shutdown();
161166
}
162167

163168
void record(
164169
const rosbag2_storage::StorageOptions & storage_options,
165170
RecordOptions & record_options)
166171
{
172+
exit_ = false;
173+
auto exec = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
167174
if (record_options.rmw_serialization_format.empty()) {
168175
record_options.rmw_serialization_format = std::string(rmw_get_serialization_format());
169176
}
@@ -173,20 +180,42 @@ class Recorder
173180
std::move(writer), storage_options, record_options);
174181
recorder->record();
175182

176-
exec_->add_node(recorder);
177-
// Release the GIL for long-running record, so that calling Python code can use other threads
183+
exec->add_node(recorder);
184+
// Run exec->spin() in a separate thread, because we need to call exec->cancel() after
185+
// recorder->stop() to be able to send notifications about bag split and close.
186+
auto spin_thread = std::thread(
187+
[&exec]() {
188+
exec->spin();
189+
});
178190
{
191+
// Release the GIL for long-running record, so that calling Python code can use other threads
179192
py::gil_scoped_release release;
180-
exec_->spin();
193+
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
194+
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();});
195+
recorder->stop();
196+
}
197+
exec->cancel();
198+
if (spin_thread.joinable()) {
199+
spin_thread.join();
181200
}
201+
exec->remove_node(recorder);
182202
}
183203

184-
void cancel()
204+
static void cancel()
185205
{
186-
exec_->cancel();
206+
exit_ = true;
207+
wait_for_exit_cv_.notify_all();
187208
}
209+
210+
protected:
211+
static std::atomic_bool exit_;
212+
static std::condition_variable wait_for_exit_cv_;
213+
std::mutex wait_for_exit_mutex_;
188214
};
189215

216+
std::atomic_bool Recorder::exit_{false};
217+
std::condition_variable Recorder::wait_for_exit_cv_{};
218+
190219
// Simple wrapper to read the output config YAML into structs
191220
void bag_rewrite(
192221
const std::vector<rosbag2_storage::StorageOptions> & input_options,
@@ -283,8 +312,9 @@ PYBIND11_MODULE(_transport, m) {
283312

284313
py::class_<rosbag2_py::Recorder>(m, "Recorder")
285314
.def(py::init())
286-
.def("record", &rosbag2_py::Recorder::record)
287-
.def("cancel", &rosbag2_py::Recorder::cancel)
315+
.def(
316+
"record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options"))
317+
.def_static("cancel", &rosbag2_py::Recorder::cancel)
288318
;
289319

290320
m.def(

rosbag2_py/test/test_transport.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,4 @@ def test_record_cancel(tmp_path):
8585
db3_path = Path(bag_path) / 'test_record_cancel_0.db3'
8686
assert wait_for(lambda: metadata_path.is_file() and db3_path.is_file(),
8787
timeout=rclpy.duration.Duration(seconds=3))
88+
record_thread.join()

0 commit comments

Comments
 (0)