diff --git a/README.md b/README.md index cb6d7f6..1e7f5f7 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,8 @@ Buffer recent messages until triggered to write or trigger an already running in default_duration_limit: 10.0 # [Optional, default=-1] Maximum time difference between newest and oldest message in seconds default_memory_limit: 64.0 # [Optional, default=-1] Maximum memory used by messages in each topic's buffer, in MB topics: ["/topic1", "/topic2"] # [Optional] List of topics to buffer. If empty, buffer all topics. + blacklist_topics: ["/topic1"] # [Optional] Only used if topics=[] + blacklist_types: ["sensor_msgs/msg/Image"] # [Optional] Only used if topics=[] topic_details: /topic1: type: "sensor_msgs/msg/NavSatFix" # [Required if topic is specified] Topic type diff --git a/rosbag2_snapshot/include/rosbag2_snapshot/snapshotter.hpp b/rosbag2_snapshot/include/rosbag2_snapshot/snapshotter.hpp index 1a587be..cd7be8c 100644 --- a/rosbag2_snapshot/include/rosbag2_snapshot/snapshotter.hpp +++ b/rosbag2_snapshot/include/rosbag2_snapshot/snapshotter.hpp @@ -43,6 +43,7 @@ #include #include #include +#include namespace rosbag2_snapshot { @@ -110,6 +111,8 @@ struct SnapshotterTopicOptions // Maximum memory usage of the buffer before older messages are removed int32_t memory_limit_; + rclcpp::QoS qos_; + SnapshotterTopicOptions( rclcpp::Duration duration_limit = INHERIT_DURATION_LIMIT, int32_t memory_limit = INHERIT_MEMORY_LIMIT); @@ -131,6 +134,10 @@ struct SnapshotterOptions // Provides list of topics to snapshot and their limit configurations topics_t topics_; + std::set blacklist_topics_; + + std::set blacklist_types_; + SnapshotterOptions( rclcpp::Duration default_duration_limit = rclcpp::Duration(30s), int32_t default_memory_limit = -1); @@ -193,7 +200,7 @@ class MessageQueue typedef std::pair range_t; // Get a begin and end iterator into the buffer respecting the start and // end timestamp constraints - range_t rangeFromTimes(const rclcpp::Time & start, const rclcpp::Time & end); + range_t rangeFromTimes(const rclcpp::Time & now, const rclcpp::Time & start, const rclcpp::Time & end); // Return the total message size including the meta-information int64_t getMessageSize(SnapshotMessage const & msg) const; @@ -253,6 +260,7 @@ class Snapshotter : public rclcpp::Node // Subscribe to one of the topics, setting up the callback to add to the respective queue void subscribe( const TopicDetails & topic_details, + const SnapshotterTopicOptions & topic_options, std::shared_ptr queue); // Called on new message from any configured topic. Adds to queue for that topic void topicCb( diff --git a/rosbag2_snapshot/package.xml b/rosbag2_snapshot/package.xml index 9706a18..eef96e9 100644 --- a/rosbag2_snapshot/package.xml +++ b/rosbag2_snapshot/package.xml @@ -12,6 +12,8 @@ ament_cmake_auto rosbag2_cpp + rosbag2_storage + rosbag2_transport rosbag2_snapshot_msgs rclcpp rclcpp_components diff --git a/rosbag2_snapshot/src/snapshotter.cpp b/rosbag2_snapshot/src/snapshotter.cpp index 8d0681f..10524c8 100644 --- a/rosbag2_snapshot/src/snapshotter.cpp +++ b/rosbag2_snapshot/src/snapshotter.cpp @@ -26,9 +26,10 @@ // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. -#include +#include #include #include +#include #include @@ -67,7 +68,7 @@ static constexpr uint32_t MB_TO_B = 1e6; SnapshotterTopicOptions::SnapshotterTopicOptions( rclcpp::Duration duration_limit, int32_t memory_limit) -: duration_limit_(duration_limit), memory_limit_(memory_limit) +: duration_limit_(duration_limit), memory_limit_(memory_limit), qos_(10) { } @@ -76,7 +77,9 @@ SnapshotterOptions::SnapshotterOptions( int32_t default_memory_limit) : default_duration_limit_(default_duration_limit), default_memory_limit_(default_memory_limit), - topics_() + topics_(), + blacklist_topics_(), + blacklist_types_() { } @@ -217,14 +220,21 @@ SnapshotMessage MessageQueue::_pop() return tmp; } -MessageQueue::range_t MessageQueue::rangeFromTimes(Time const & start, Time const & stop) +MessageQueue::range_t MessageQueue::rangeFromTimes(Time const & now, Time const & start, Time const & stop) { range_t::first_type begin = queue_.begin(); range_t::second_type end = queue_.end(); + auto bounded_start = start; + + if (!queue_.empty() && options_.duration_limit_ != SnapshotterTopicOptions::NO_DURATION_LIMIT) { + auto lower_bound = now - options_.duration_limit_; + bounded_start = std::max(start, lower_bound); + } + // Increment / Decrement iterators until time contraints are met - if (start.seconds() != 0.0 || start.nanoseconds() != 0) { - while (begin != end && (*begin).time < start) { + if (bounded_start.seconds() != 0.0 || bounded_start.nanoseconds() != 0) { + while (begin != end && (*begin).time < bounded_start) { ++begin; } } @@ -259,7 +269,7 @@ Snapshotter::Snapshotter(const rclcpp::NodeOptions & options) buffers_.emplace(details, queue); assert(res.second); - subscribe(details, queue); + subscribe(details, pair.second, queue); } // Now that subscriptions are setup, setup service servers for writing and pausing @@ -370,8 +380,40 @@ void Snapshotter::parseOptionsFromParams() } } else { options_.all_topics_ = true; - RCLCPP_INFO(get_logger(), "No topics list provided. Logging all topics."); - RCLCPP_WARN(get_logger(), "Logging all topics is very memory-intensive."); + RCLCPP_INFO(get_logger(), "No topics list provided. Recording everything except blacklisted topics and types."); + RCLCPP_INFO(get_logger(), "Logging all topics is very memory-intensive."); + + std::vector blacklist_topics{}; + + try { + blacklist_topics = + declare_parameter>("blacklist_topics", std::vector{}); + } catch (const rclcpp::ParameterTypeException & ex) { + if (std::string{ex.what()}.find("not set") == std::string::npos) { + RCLCPP_ERROR(get_logger(), "blacklist_topics must be an array of strings."); + throw ex; + } + } + + for (auto topic_name : blacklist_topics) { + options_.blacklist_topics_.insert(topic_name); + } + + std::vector blacklist_types{}; + + try { + blacklist_types = + declare_parameter>("blacklist_types", std::vector{}); + } catch (const rclcpp::ParameterTypeException & ex) { + if (std::string{ex.what()}.find("not set") == std::string::npos) { + RCLCPP_ERROR(get_logger(), "blacklist_types must be an array of strings."); + throw ex; + } + } + + for (auto type_name : blacklist_types) { + options_.blacklist_types_.insert(type_name); + } } } @@ -425,9 +467,10 @@ void Snapshotter::topicCb( void Snapshotter::subscribe( const TopicDetails & topic_details, + const SnapshotterTopicOptions & topic_options, std::shared_ptr queue) { - RCLCPP_INFO(get_logger(), "Subscribing to %s", topic_details.name.c_str()); + RCLCPP_DEBUG(get_logger(), "Subscribing to %s", topic_details.name.c_str()); auto opts = rclcpp::SubscriptionOptions{}; opts.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable; @@ -436,7 +479,7 @@ void Snapshotter::subscribe( auto sub = create_generic_subscription( topic_details.name, topic_details.type, - rclcpp::QoS{10}, + topic_options.qos_, std::bind(&Snapshotter::topicCb, this, _1, queue), opts ); @@ -454,12 +497,18 @@ bool Snapshotter::writeTopic( // acquire lock for this queue std::lock_guard l(message_queue.lock); - MessageQueue::range_t range = message_queue.rangeFromTimes(req->start_time, req->stop_time); + MessageQueue::range_t range = message_queue.rangeFromTimes(now(), req->start_time, req->stop_time); + + SnapshotterTopicOptions topic_options = options_.topics_.at(topic_details); + + YAML::Node offered_qos_profiles; + offered_qos_profiles.push_back(rosbag2_transport::Rosbag2QoS(topic_options.qos_)); rosbag2_storage::TopicMetadata tm; tm.name = topic_details.name; tm.type = topic_details.type; tm.serialization_format = "cdr"; + tm.offered_qos_profiles = YAML::Dump(offered_qos_profiles); bag_writer.create_topic(tm); @@ -519,7 +568,7 @@ void Snapshotter::triggerSnapshotCb( } // Ensure that state is updated when function exits, regardlesss of branch path / exception events - RCLCPP_SCOPE_EXIT( + auto on_exit = rcpputils::make_scope_exit([&]() { // Clear buffers beacuase time gaps (skipped messages) may have occured while paused std::unique_lock write_lock(state_lock_); // Turn off writing flag and return recording to its state before writing @@ -527,7 +576,7 @@ void Snapshotter::triggerSnapshotCb( if (recording_prior) { this->resume(); } - ); + }); rosbag2_cpp::Writer bag_writer{}; @@ -638,29 +687,125 @@ void Snapshotter::pollTopics() for (const auto & name_type : topic_names_and_types) { if (name_type.second.size() < 1) { - RCLCPP_ERROR(get_logger(), "Subscribed topic has no associated type."); - return; + RCLCPP_ERROR(get_logger(), "Subscribed topic has no associated type, skipping."); + continue; } if (name_type.second.size() > 1) { - RCLCPP_ERROR(get_logger(), "Subscribed topic has more than one associated type."); - return; + RCLCPP_ERROR(get_logger(), "Subscribed topic has more than one associated type, skipping."); + continue; } TopicDetails details{}; details.name = name_type.first; details.type = name_type.second[0]; - if (options_.addTopic(details)) { - SnapshotterTopicOptions topic_options; - fixTopicOptions(topic_options); - auto queue = std::make_shared(topic_options, get_logger()); + if (options_.blacklist_topics_.count(details.name)) { + RCLCPP_DEBUG(get_logger(), "Skipping blacklisted topic: %s", details.name.c_str()); + continue; + } + + if (options_.blacklist_types_.count(details.type)) { + RCLCPP_DEBUG(get_logger(), "Skipping blacklisted type: %s", details.type.c_str()); + continue; + } + + bool already_recording = !options_.addTopic(details); + + if (already_recording) { + // Already recording this topic + continue; + } + + const auto info = get_publishers_info_by_topic(details.name); + + // Find all publishers for this topic + if (info.size() == 0) { + RCLCPP_DEBUG(get_logger(), "No publisher info found for topic: %s", details.name.c_str()); + continue; + } - std::pair res = buffers_.emplace(details, queue); - assert(res.second); - subscribe(details, queue); + // Find reliability of all publishers + std::vector reliabilities; + std::transform( + info.begin(), info.end(), std::back_inserter(reliabilities), + [](const rclcpp::TopicEndpointInfo & i) { return i.qos_profile().reliability(); }); + if (reliabilities.size() == 0) { + RCLCPP_ERROR(get_logger(), "Skipping topic %s because QoS reliability could not be found for it", details.name.c_str()); + continue; } + if (!std::equal(reliabilities.begin() + 1, reliabilities.end(), reliabilities.begin())) { + RCLCPP_ERROR(get_logger(), "Skipping topic %s because it has multiple publishers with different QoS reliabilities", details.name.c_str()); + continue; + } + auto reliability = reliabilities.at(0); + + // Find durability of all publishers + std::vector durabilities; + std::transform( + info.begin(), info.end(), std::back_inserter(durabilities), + [](const rclcpp::TopicEndpointInfo & i) { return i.qos_profile().durability(); }); + if (durabilities.size() == 0) { + RCLCPP_ERROR(get_logger(), "Skipping topic %s because QoS durability could not be found for it", details.name.c_str()); + continue; + } + auto durability = durabilities.at(0); + bool atleastOneTransientLocal = std::count(durabilities.begin(), durabilities.end(), rclcpp::DurabilityPolicy::TransientLocal) > 0; + + // Use default duration limit, with no memory limit, for regular non-transient local topics + SnapshotterTopicOptions topic_options; + + topic_options.qos_.reliability(reliability); + topic_options.qos_.durability(durability); + topic_options.duration_limit_ = options_.default_duration_limit_; + topic_options.memory_limit_ = SnapshotterTopicOptions::NO_MEMORY_LIMIT; + + if (atleastOneTransientLocal) { + // If topic is transient local, we need to limit memory, but not duration + topic_options.qos_.durability(rclcpp::DurabilityPolicy::TransientLocal); + topic_options.duration_limit_ = SnapshotterTopicOptions::NO_DURATION_LIMIT; + topic_options.memory_limit_ = options_.default_memory_limit_; + } + + auto queue = std::make_shared(topic_options, get_logger()); + + std::pair res = buffers_.emplace(details, queue); + assert(res.second); + subscribe(details, topic_options, queue); + + auto reliabilityToString = [](const rclcpp::ReliabilityPolicy & r) { + switch (r) { + case rclcpp::ReliabilityPolicy::Reliable: + return "Reliable"; + case rclcpp::ReliabilityPolicy::BestEffort: + return "Best Effort"; + case rclcpp::ReliabilityPolicy::SystemDefault: + return "System Default"; + default: + return "Unknown"; + } + }; + + auto durabilityToString = [](const rclcpp::DurabilityPolicy & d) { + switch (d) { + case rclcpp::DurabilityPolicy::TransientLocal: + return "Transient Local"; + case rclcpp::DurabilityPolicy::Volatile: + return "Volatile"; + case rclcpp::DurabilityPolicy::SystemDefault: + return "System Default"; + default: + return "Unknown"; + } + }; + + auto reliability_str = reliabilityToString(topic_options.qos_.reliability()); + auto durability_str = durabilityToString(topic_options.qos_.durability()); + + RCLCPP_DEBUG( + get_logger(), "Subscribed to new topic: %s with type %s, with reliability %s and durability %s", + details.name.c_str(), details.type.c_str(), reliability_str, durability_str); } } @@ -748,7 +893,7 @@ SnapshotterClient::SnapshotterClient(const rclcpp::NodeOptions & options) if (opts.action_ == SnapshotterClientOptions::TRIGGER_WRITE && opts.topics_.size() == 0) { RCLCPP_INFO(get_logger(), "No topics provided - logging all topics."); - RCLCPP_WARN(get_logger(), "Logging all topics is very memory-intensive."); + RCLCPP_INFO(get_logger(), "Logging all topics is very memory-intensive."); } setSnapshotterClientOptions(opts); @@ -765,7 +910,7 @@ void SnapshotterClient::setSnapshotterClientOptions(const SnapshotterClientOptio }; } - TriggerSnapshot::Request::SharedPtr req; + TriggerSnapshot::Request::SharedPtr req = std::make_shared(); for (const auto & topic : opts.topics_) { req->topics.push_back(topic.asMessage());