diff --git a/rosbag2_transport/include/rosbag2_transport/recorder.hpp b/rosbag2_transport/include/rosbag2_transport/recorder.hpp index 16f90de62..684f82cf5 100644 --- a/rosbag2_transport/include/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/include/rosbag2_transport/recorder.hpp @@ -117,10 +117,12 @@ class Recorder : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC virtual ~Recorder(); - /// @brief Start recording. - /// The record() method will return almost immediately and recording will happen in background. + /// \brief Start recording. + /// \details The record() method will return almost immediately and recording will happen in + /// background. + /// \param uri If provided, it will override the storage_options.uri provided during construction. ROSBAG2_TRANSPORT_PUBLIC - void record(); + void record(const std::string & uri = ""); /// @brief Add a new channel (topic) to the rosbag2 writer to be recorded. /// \details This is a direct Recorder API equivalent to the rosbag2_cpp::Writer::add_topic(). @@ -214,8 +216,8 @@ class Recorder : public rclcpp::Node uint64_t get_total_num_messages_lost_in_transport() const; /// @brief Stopping recording. - /// @details The stop() is opposite to the record() operation. It will stop recording, dump - /// all buffers to the disk and close writer. The record() can be called again after stop(). + /// @details The stop() is opposite to the record(uri) operation. It will stop recording, dump + /// all buffers to the disk and close writer. The record(uri) can be called again after stop(). ROSBAG2_TRANSPORT_PUBLIC void stop(); diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index a28ffcd00..f2a7af109 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -63,9 +63,11 @@ class RecorderImpl ~RecorderImpl(); - /// @brief Start recording. - /// The record() method will return almost immediately and recording will happen in background. - void record(); + /// \brief Start recording. + /// \details The record(uri) method will return almost immediately and recording will happen in + /// background. + /// \param uri If provided, it will override the storage_options.uri provided during construction. + void record(const std::string & uri = ""); /// @brief Add a new channel (topic) to the rosbag2 writer to be recorded. /// \details This is a direct Recorder API equivalent to the rosbag2_cpp::Writer::add_topic(). @@ -156,7 +158,7 @@ class RecorderImpl uint64_t get_total_num_messages_lost_in_transport() const; /// @brief Stopping recording and closing writer. - /// The record() can be called again after stop(). + /// The record(uri) can be called again after stop(). void stop(); /// Get a const reference to the underlying rosbag2 writer. @@ -348,15 +350,19 @@ void RecorderImpl::stop() } } -void RecorderImpl::record() +void RecorderImpl::record(const std::string & uri) { std::lock_guard state_lock(start_stop_transition_mutex_); if (in_recording_.exchange(true)) { RCLCPP_WARN_STREAM( node->get_logger(), - "Called Recorder::record() while already in recording, dismissing request."); + "Called Recorder::record(uri) while already in recording, dismissing request."); return; } + if (!uri.empty()) { + storage_options_.uri = uri; + } + RCLCPP_INFO(node->get_logger(), "Starting recording to '%s'", storage_options_.uri.c_str()); paused_ = record_options_.start_paused; topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides; // Check serialization format options @@ -383,6 +389,31 @@ void RecorderImpl::record() subscriptions_.clear(); event_notifier_->reset_total_num_messages_lost_in_transport(); event_notifier_->reset_total_num_messages_lost_in_recorder(); + + // Check if storage_options.uri already exists and try to add '(n)' postfix + namespace fs = std::filesystem; + fs::path storage_path(storage_options_.uri); + if (fs::is_directory(storage_path)) { + RCLCPP_WARN_STREAM(node->get_logger(), + "Bag directory '" << storage_path.c_str() << "' already exists."); + for (size_t i = 1U; i < std::numeric_limits::max(); i++) { + fs::path new_path = storage_path; + new_path += "(" + std::to_string(i) + ")"; + if (!fs::exists(new_path)) { + RCLCPP_WARN_STREAM(node->get_logger(), + "Changing bag directory to '" << new_path.c_str() << "'."); + storage_options_.uri = new_path.generic_string(); + storage_path = new_path; + break; + } + } + } + if (fs::is_directory(storage_path)) { + throw std::runtime_error{ + "Failed to derive non-existent directory for the new Rosbag2 recording. " + "Please specify non existent uri explicitly."}; + } + writer_->open( storage_options_, {record_options_.input_serialization_format, record_options_.output_serialization_format}); @@ -513,7 +544,7 @@ void RecorderImpl::start_discovery() // Get graph event to ensure to start GrapListener if not already started and register event, // before we're starting discovery thread. // This is a workaround to split initialization and runtime phases to avoid race condition when - // a new publisher appeared after we finish start_discovery() and/or Recorder::record(), + // a new publisher appeared after we finish start_discovery() and/or Recorder::record(uri), // but before we really start topics_discovery() thread. discovery_graph_event_ = node->get_graph_event(); discovery_future_ = @@ -993,9 +1024,9 @@ Recorder::Recorder( Recorder::~Recorder() = default; -void Recorder::record() +void Recorder::record(const std::string & uri) { - pimpl_->record(); + pimpl_->record(uri); } void Recorder::add_channel( diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index c1075df93..459ec0bee 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -14,6 +14,7 @@ #include +#include #include #include #include @@ -21,10 +22,13 @@ #include #include +#include "mock_recorder.hpp" #include "rclcpp/rclcpp.hpp" +#include "rcpputils/scope_exit.hpp" #include "rosbag2_test_common/publication_manager.hpp" #include "rosbag2_test_common/wait_for.hpp" +#include "rosbag2_test_common/temporary_directory_fixture.hpp" #include "rosbag2_transport/recorder.hpp" @@ -34,6 +38,11 @@ #include "rosbag2_storage/qos.hpp" #include "record_integration_fixture.hpp" +#include "rosbag2_transport/reader_writer_factory.hpp" + +using namespace ::testing; // NOLINT +using rosbag2_test_common::TemporaryDirectoryFixture; +namespace fs = std::filesystem; TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are_recorded) { @@ -126,6 +135,33 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are } } +TEST_F(TemporaryDirectoryFixture, can_record_again_after_stop_with_real_storage) { + rclcpp::init(0, nullptr); + auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {rclcpp::shutdown();}); + std::string test_topic = "/can_record_again_after_stop_topic"; + rosbag2_storage::StorageOptions storage_options{}; + storage_options.uri = (fs::path(temporary_dir_path_) / "start_stop_again").generic_string(); + + rosbag2_transport::RecordOptions record_options{}; + + auto writer = rosbag2_transport::ReaderWriterFactory::make_writer(record_options); + { + auto recorder = std::make_shared( + std::move(writer), storage_options, record_options); + + EXPECT_NO_THROW(recorder->record()); + fs::path storage_path(storage_options.uri); + EXPECT_TRUE(fs::is_directory(storage_path)); + + EXPECT_NO_THROW(recorder->stop()); + EXPECT_NO_THROW(recorder->record()); + storage_path = recorder->get_storage_options().uri; + EXPECT_TRUE(fs::is_directory(storage_path)); + std::string expected_path_str = storage_options.uri + "(1)"; + EXPECT_EQ(storage_path.generic_string(), expected_path_str); + } +} + TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop) { GTEST_SKIP() << "Skipping test `can_record_again_after_stop` in rosbag2_transport, until "