diff --git a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp index 7a42426e05..c50ff24e0e 100644 --- a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp +++ b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp @@ -72,33 +72,33 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter ~SequentialCompressionWriter() override; /** - * Create a new topic in the underlying storage. Needs to be called for every topic used within - * a message which is passed to write(...). - * + * \brief Create a new topic in the underlying storage. + * \details Needs to be called for every topic used within a message which is passed + * to \ref write "write(...)". + * \note If writer is not open, this will just store the topic information locally and + * topics will be created on storage open. * \param topic_with_type name and type identifier of topic to be created - * \throws runtime_error if the Writer is not open. */ void create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override; /** - * Create a new topic in the underlying storage. Needs to be called for every topic used within - * a message which is passed to write(...). - * + * \brief Create a new topic in the underlying storage. + * \details Needs to be called for every topic used within a message which is passed + * to \ref write "write(...)". + * \note If writer is not open, this will just store the topic information locally and + * topics will be created on storage open. * \param topic_with_type name and type identifier of topic to be created - * \param message_definition definition of topic_with_type.type - * \throws runtime_error if the Writer is not open. + * \param message_definition message definition content for this topic's type */ void create_topic( const rosbag2_storage::TopicMetadata & topic_with_type, const rosbag2_storage::MessageDefinition & message_definition) override; /** - * Remove a new topic in the underlying storage. - * If creation of subscription fails remove the topic - * from the db (more of cleanup) - * + * \brief Removes a new topic in the underlying storage. + * \details Expected to be used if creation of subscription fails and cleanup is needed. + * \note If writer is not open, this will just remove the topic information locally. * \param topic_with_type name and type identifier of topic to be created - * \throws runtime_error if the Writer is not open. */ void remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override; @@ -115,9 +115,9 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter void write(std::shared_ptr message) override; /** - * Opens a new bagfile and prepare it for writing messages. The bagfile must not exist. - * This must be called before any other function is used. - * + * \brief Opens a new bagfile and prepare it for writing messages. The bagfile must not exist. + * \details This must be called before any other function is used among \ref create_topic + * and \ref remove_topic. * \param storage_options Options to configure the storage * \param converter_options options to define in which format incoming messages are stored **/ diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp index 521c2f80d3..a8e8e4dd9c 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp @@ -63,9 +63,9 @@ class ROSBAG2_CPP_PUBLIC Writer ~Writer(); /** - * Opens a new bagfile and prepare it for writing messages. The bagfile must not exist. - * This must be called before any other function is used. - * + * \brief Opens a new bagfile and prepare it for writing messages. The bagfile must not exist. + * \details This must be called before any other function is used among \ref create_topic + * and \ref remove_topic. * \note This will open URI with the default storage options * * using default storage backend * * using no converter options, storing messages with the incoming serialization format @@ -78,9 +78,9 @@ class ROSBAG2_CPP_PUBLIC Writer void open(const std::string & uri); /** - * Opens a new bagfile and prepare it for writing messages. The bagfile must not exist. - * This must be called before any other function is used. - * + * \brief Opens a new bagfile and prepare it for writing messages. The bagfile must not exist. + * \details This must be called before any other function is used among \ref create_topic + * and \ref remove_topic. * \param storage_options Options to configure the storage * \param converter_options options to define in which format incoming messages are stored **/ @@ -94,21 +94,23 @@ class ROSBAG2_CPP_PUBLIC Writer void close(); /** - * Create a new topic in the underlying storage. Needs to be called for every topic used within - * a message which is passed to write(...). - * + * \brief Create a new topic in the underlying storage. + * \details Needs to be called for every topic used within a message which is passed + * to \ref write "write(...)". + * \note If writer is not open, this will just store the topic information locally and + * topics will be created on storage open. * \param topic_with_type name and type identifier of topic to be created - * \throws runtime_error if the Writer is not open. */ void create_topic(const rosbag2_storage::TopicMetadata & topic_with_type); /** - * Create a new topic in the underlying storage. Needs to be called for every topic used within - * a message which is passed to write(...). - * + * \brief Create a new topic in the underlying storage. + * \details Needs to be called for every topic used within a message which is passed + * to \ref write "write(...)". + * \note If writer is not open, this will just store the topic information locally and + * topics will be created on storage open. * \param topic_with_type name and type identifier of topic to be created * \param message_definition message definition content for this topic's type - * \throws runtime_error if the Writer is not open. */ void create_topic( const rosbag2_storage::TopicMetadata & topic_with_type, @@ -126,12 +128,10 @@ class ROSBAG2_CPP_PUBLIC Writer void split_bagfile(); /** - * Remove a new topic in the underlying storage. - * If creation of subscription fails remove the topic - * from the db (more of cleanup) - * + * \brief Removes a new topic in the underlying storage. + * \details Expected to be used if creation of subscription fails and cleanup is needed. + * \note If writer is not open, this will just remove the topic information locally. * \param topic_with_type name and type identifier of topic to be created - * \throws runtime_error if the Writer is not open. */ void remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type); diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index 06478fd56b..f4076a92d1 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -72,9 +72,9 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter ~SequentialWriter() override; /** - * Opens a new bagfile and prepare it for writing messages. The bagfile must not exist. - * This must be called before any other function is used. - * + * \brief Opens a new bagfile and prepare it for writing messages. The bagfile must not exist. + * \details This must be called before any other function is used among \ref create_topic + * and \ref remove_topic. * \param storage_options Options to configure the storage * \param converter_options options to define in which format incoming messages are stored **/ @@ -85,33 +85,33 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter void close() override; /** - * Create a new topic in the underlying storage. Needs to be called for every topic used within - * a message which is passed to write(...). - * + * \brief Create a new topic in the underlying storage. + * \details Needs to be called for every topic used within a message which is passed + * to \ref write "write(...)". + * \note If writer is not open, this will just store the topic information locally and + * topics will be created on storage open. * \param topic_with_type name and type identifier of topic to be created - * \throws runtime_error if the Writer is not open. */ void create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override; /** - * Create a new topic in the underlying storage. Needs to be called for every topic used within - * a message which is passed to write(...). - * + * \brief Create a new topic in the underlying storage. + * \details Needs to be called for every topic used within a message which is passed + * to \ref write "write(...)". + * \note If writer is not open, this will just store the topic information locally and + * topics will be created on storage open. * \param topic_with_type name and type identifier of topic to be created * \param message_definition message definition content for this topic's type - * \throws runtime_error if the Writer is not open. */ void create_topic( const rosbag2_storage::TopicMetadata & topic_with_type, const rosbag2_storage::MessageDefinition & message_definition) override; /** - * Remove a new topic in the underlying storage. - * If creation of subscription fails remove the topic - * from the db (more of cleanup) - * + * \brief Removes a new topic in the underlying storage. + * \details Expected to be used if creation of subscription fails and cleanup is needed. + * \note If writer is not open, this will just remove the topic information locally. * \param topic_with_type name and type identifier of topic to be created - * \throws runtime_error if the Writer is not open. */ void remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override; @@ -171,11 +171,17 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter rosbag2_storage::StorageOptions storage_options_; - // Used to track topic -> message count. If cache is present, it is updated by CacheConsumer + /// \brief Topic name to the TopicInformation map. + /// Used to keep topic list and track message counts. If cache is present, the message + /// counts updated by CacheConsumer. + /// \note The map is persisted across bagfile splits and writer close()->open() operations. + /// However, the message counts inside TopicInformation are reset to zero on close() and open(). + /// \note topics_names_to_info_ needs to be protected with \sa topics_info_mutex_ only when we + /// are explicitly adding or deleting items (create_topic(..)/remove_topic(..)) and when we access + /// it from CacheConsumer callback i.e., write_messages(..). In all other cases like in write(..) + /// it is safe to access without topics_info_mutex_ losck as all external API calls are protected + /// with \sa writer_mutex_ on \sa rosbag2_cpp::Writer level. std::unordered_map topics_names_to_info_; - // Note: topics_names_to_info_ needs to be protected with mutex only when we are explicitly - // adding or deleting items (create_topic(..)/remove_topic(..)) and when we access it from - // CacheConsumer callback i.e., write_messages(..) std::mutex topics_info_mutex_; LocalMessageDefinitionSource message_definitions_; diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 787784dd52..1aa1a13890 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -56,8 +56,6 @@ SequentialWriter::SequentialWriter( storage_(nullptr), metadata_io_(std::move(metadata_io)), converter_(nullptr), - topics_names_to_info_(), - topic_names_to_message_definitions_(), metadata_() {} @@ -169,6 +167,18 @@ void SequentialWriter::open( } init_metadata(); + // Register topics in storage if they already exists + metadata_.topics_with_message_count.clear(); + metadata_.topics_with_message_count.reserve(topics_names_to_info_.size()); + for (auto & [topic_name, topic_info] : topics_names_to_info_) { + topic_info.message_count = 0U; + auto const & md = topic_names_to_message_definitions_[topic_name]; + storage_->create_topic(topic_info.topic_metadata, md); + metadata_.topics_with_message_count.push_back(topic_info); + if (converter_) { + converter_->add_topic(topic_name, topic_info.topic_metadata.type); + } + } storage_->update_metadata(metadata_); is_open_ = true; } @@ -205,19 +215,20 @@ void SequentialWriter::close() execute_bag_split_callbacks(closed_file, ""); } - topics_names_to_info_.clear(); - topic_names_to_message_definitions_.clear(); + // Zero message counts for all topics + std::lock_guard lock(topics_info_mutex_); + for (auto & [_, topic_info] : topics_names_to_info_) { + topic_info.message_count = 0U; + } converter_.reset(); } void SequentialWriter::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) { - if (topics_names_to_info_.find(topic_with_type.name) != - topics_names_to_info_.end()) - { - // nothing to do, topic already created - return; + // Don't need to lock topics_info_mutex_ since we are not modifying topics_names_to_info_ here + if (topics_names_to_info_.find(topic_with_type.name) != topics_names_to_info_.end()) { + return; // nothing to do, topic already created } rosbag2_storage::MessageDefinition definition = message_definitions_.get_full_text_ext(topic_with_type.type, topic_with_type.name); @@ -241,65 +252,39 @@ void SequentialWriter::create_topic( const rosbag2_storage::TopicMetadata & topic_with_type, const rosbag2_storage::MessageDefinition & message_definition) { - if (topics_names_to_info_.find(topic_with_type.name) != - topics_names_to_info_.end()) - { - // nothing to do, topic already created - return; - } - - if (!is_open_) { - throw std::runtime_error("Bag is not open. Call open() before writing."); - } - rosbag2_storage::TopicInformation info{}; - info.topic_metadata = topic_with_type; - - bool insert_succeeded = false; { std::lock_guard lock(topics_info_mutex_); - const auto insert_res = topics_names_to_info_.insert( - std::make_pair(topic_with_type.name, info)); - insert_succeeded = insert_res.second; - } - - if (!insert_succeeded) { - std::stringstream errmsg; - errmsg << "Failed to insert topic \"" << topic_with_type.name << "\"!"; - - throw std::runtime_error(errmsg.str()); + if (topics_names_to_info_.find(topic_with_type.name) != topics_names_to_info_.end()) { + return; // nothing to do, topic already created + } + info.topic_metadata = topic_with_type; + (void)topics_names_to_info_.insert({topic_with_type.name, info}); + (void)topic_names_to_message_definitions_.insert({topic_with_type.name, message_definition}); } - topic_names_to_message_definitions_.insert( - std::make_pair(topic_with_type.name, message_definition)); - - storage_->create_topic(topic_with_type, message_definition); - - if (converter_) { - converter_->add_topic(topic_with_type.name, topic_with_type.type); + if (is_open_.load()) { + storage_->create_topic(topic_with_type, message_definition); + metadata_.topics_with_message_count.push_back(info); + if (converter_) { + converter_->add_topic(topic_with_type.name, topic_with_type.type); + } } } void SequentialWriter::remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type) { - if (!is_open_) { - throw std::runtime_error("Bag is not open. Call open() before removing."); - } - - bool erased = false; - { - std::lock_guard lock(topics_info_mutex_); - erased = topics_names_to_info_.erase(topic_with_type.name) > 0; - erased = erased && (topic_names_to_message_definitions_.erase(topic_with_type.name) > 0); - } + std::lock_guard lock(topics_info_mutex_); + bool erased = topics_names_to_info_.erase(topic_with_type.name) > 0; + erased = erased && (topic_names_to_message_definitions_.erase(topic_with_type.name) > 0); if (erased) { - storage_->remove_topic(topic_with_type); + if (is_open_.load()) { + storage_->remove_topic(topic_with_type); + } } else { std::stringstream errmsg; - errmsg << "Failed to remove the non-existing topic \"" << - topic_with_type.name << "\"!"; - + errmsg << "Failed to remove the non-existing topic \"" << topic_with_type.name << "\"!"; throw std::runtime_error(errmsg.str()); } } @@ -329,6 +314,9 @@ void SequentialWriter::switch_to_next_storage() storage_options_.uri = format_storage_uri( base_folder_, metadata_.relative_file_paths.size()); + // TODO(morlov): If we would ever remove the upper level writer mutex lock, consider protecting + // storage_ with mutex to avoid race conditions with write(msg) call when we are switching to + // next storage and not using cache. storage_ = storage_factory_->open_read_write(storage_options_); if (!storage_) { std::stringstream errmsg; @@ -345,10 +333,13 @@ void SequentialWriter::switch_to_next_storage() metadata_.relative_file_paths.push_back(file_info.path); storage_->update_metadata(metadata_); - // Re-register all topics since we rolled-over to a new bagfile. - for (const auto & topic : topics_names_to_info_) { - auto const & md = topic_names_to_message_definitions_[topic.first]; - storage_->create_topic(topic.second.topic_metadata, md); + { + // Re-register all topics since we rolled-over to a new bagfile. + std::lock_guard lock(topics_info_mutex_); + for (const auto & topic : topics_names_to_info_) { + auto const & md = topic_names_to_message_definitions_[topic.first]; + storage_->create_topic(topic.second.topic_metadata, md); + } } if (use_cache_) { @@ -394,12 +385,13 @@ void SequentialWriter::write(std::shared_ptrtopic_name); - } catch (const std::out_of_range & /* oor */) { + rosbag2_storage::TopicInformation * topic_information_ptr{nullptr}; + const auto & topic_name = message->topic_name; + if (const auto it = topics_names_to_info_.find(topic_name); it != topics_names_to_info_.end()) { + topic_information_ptr = &(it->second); + } else { std::stringstream errmsg; - errmsg << "Failed to write on topic '" << message->topic_name << + errmsg << "Failed to write on topic '" << topic_name << "'. Call create_topic() before first write."; throw std::runtime_error(errmsg.str()); } @@ -436,7 +428,7 @@ void SequentialWriter::write(std::shared_ptrwrite_message(converted_msg)) { metadata_.files.back().message_count++; - topic_information->message_count++; + topic_information_ptr->message_count++; } else { message_lost = true; } diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index 2914b95def..b43ac8f3ae 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -169,6 +169,48 @@ std::shared_ptr make_test_msg() return message; } +TEST_F(SequentialWriterTest, create_topic_does_not_throw_if_writer_not_open) { + auto sequential_writer = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + rosbag2_storage::TopicMetadata topic_metadata{0u, "topic1", "test_msgs/BasicTypes", "", {}, ""}; + + EXPECT_NO_THROW(writer_->create_topic(topic_metadata)); +} + +TEST_F(SequentialWriterTest, remove_topic_does_not_throw_if_writer_not_open) { + auto sequential_writer = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + rosbag2_storage::TopicMetadata topic_metadata{0u, "topic1", "test_msgs/BasicTypes", "", {}, ""}; + + EXPECT_NO_THROW(writer_->create_topic(topic_metadata)); + EXPECT_NO_THROW(writer_->remove_topic(topic_metadata)); +} + +TEST_F(SequentialWriterTest, topics_persist_between_close_and_open) { + auto sequential_writer = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + auto test_message = make_test_msg(); + rosbag2_storage::TopicMetadata topic_metadata { + 0U, test_message->topic_name, "test_msgs/BasicTypes", "", {}, "" + }; + + writer_->create_topic(topic_metadata); + writer_->open(storage_options_); + writer_->close(); + + // Reopen the writer and verify the topic still exists. i.e., writing a message without + // recreating the topic. + storage_options_.uri += "(1)"; // Add suffix to create a new bag folder in the scope of the test + writer_->open(storage_options_); + EXPECT_NO_THROW(writer_->write(test_message)); +} + TEST_F( SequentialWriterTest, write_uses_converters_to_convert_serialization_format_if_input_and_output_format_are_different) {