Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Buffer recent messages until triggered to write or trigger an already running in
default_duration_limit: 10.0 # [Optional, default=-1] Maximum time difference between newest and oldest message in seconds
default_memory_limit: 64.0 # [Optional, default=-1] Maximum memory used by messages in each topic's buffer, in MB
topics: ["/topic1", "/topic2"] # [Optional] List of topics to buffer. If empty, buffer all topics.
blacklist_topics: ["/topic1"] # [Optional] Only used if topics=[]
blacklist_types: ["sensor_msgs/msg/Image"] # [Optional] Only used if topics=[]
topic_details:
/topic1:
type: "sensor_msgs/msg/NavSatFix" # [Required if topic is specified] Topic type
Expand Down
10 changes: 9 additions & 1 deletion rosbag2_snapshot/include/rosbag2_snapshot/snapshotter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <string>
#include <utility>
#include <vector>
#include <set>

namespace rosbag2_snapshot
{
Expand Down Expand Up @@ -110,6 +111,8 @@ struct SnapshotterTopicOptions
// Maximum memory usage of the buffer before older messages are removed
int32_t memory_limit_;

rclcpp::QoS qos_;

SnapshotterTopicOptions(
rclcpp::Duration duration_limit = INHERIT_DURATION_LIMIT,
int32_t memory_limit = INHERIT_MEMORY_LIMIT);
Expand All @@ -131,6 +134,10 @@ struct SnapshotterOptions
// Provides list of topics to snapshot and their limit configurations
topics_t topics_;

std::set<std::string> blacklist_topics_;

std::set<std::string> blacklist_types_;

SnapshotterOptions(
rclcpp::Duration default_duration_limit = rclcpp::Duration(30s),
int32_t default_memory_limit = -1);
Expand Down Expand Up @@ -193,7 +200,7 @@ class MessageQueue
typedef std::pair<queue_t::const_iterator, queue_t::const_iterator> range_t;
// Get a begin and end iterator into the buffer respecting the start and
// end timestamp constraints
range_t rangeFromTimes(const rclcpp::Time & start, const rclcpp::Time & end);
range_t rangeFromTimes(const rclcpp::Time & now, const rclcpp::Time & start, const rclcpp::Time & end);

// Return the total message size including the meta-information
int64_t getMessageSize(SnapshotMessage const & msg) const;
Expand Down Expand Up @@ -253,6 +260,7 @@ class Snapshotter : public rclcpp::Node
// Subscribe to one of the topics, setting up the callback to add to the respective queue
void subscribe(
const TopicDetails & topic_details,
const SnapshotterTopicOptions & topic_options,
std::shared_ptr<MessageQueue> queue);
// Called on new message from any configured topic. Adds to queue for that topic
void topicCb(
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_snapshot/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
<buildtool_depend>ament_cmake_auto</buildtool_depend>

<depend>rosbag2_cpp</depend>
<depend>rosbag2_storage</depend>
<depend>rosbag2_transport</depend>
<depend>rosbag2_snapshot_msgs</depend>
<depend>rclcpp</depend>
<depend>rclcpp_components</depend>
Expand Down
201 changes: 173 additions & 28 deletions rosbag2_snapshot/src/snapshotter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.

#include <rclcpp/scope_exit.hpp>
#include <rcpputils/scope_exit.hpp>
#include <rclcpp/rclcpp.hpp>
#include <rosbag2_snapshot/snapshotter.hpp>
#include <rosbag2_transport/qos.hpp>

#include <filesystem>

Expand Down Expand Up @@ -67,7 +68,7 @@ static constexpr uint32_t MB_TO_B = 1e6;
SnapshotterTopicOptions::SnapshotterTopicOptions(
rclcpp::Duration duration_limit,
int32_t memory_limit)
: duration_limit_(duration_limit), memory_limit_(memory_limit)
: duration_limit_(duration_limit), memory_limit_(memory_limit), qos_(10)
{
}

Expand All @@ -76,7 +77,9 @@ SnapshotterOptions::SnapshotterOptions(
int32_t default_memory_limit)
: default_duration_limit_(default_duration_limit),
default_memory_limit_(default_memory_limit),
topics_()
topics_(),
blacklist_topics_(),
blacklist_types_()
{
}

Expand Down Expand Up @@ -217,14 +220,21 @@ SnapshotMessage MessageQueue::_pop()
return tmp;
}

MessageQueue::range_t MessageQueue::rangeFromTimes(Time const & start, Time const & stop)
MessageQueue::range_t MessageQueue::rangeFromTimes(Time const & now, Time const & start, Time const & stop)
{
range_t::first_type begin = queue_.begin();
range_t::second_type end = queue_.end();

auto bounded_start = start;

if (!queue_.empty() && options_.duration_limit_ != SnapshotterTopicOptions::NO_DURATION_LIMIT) {
auto lower_bound = now - options_.duration_limit_;
bounded_start = std::max(start, lower_bound);
}

// Increment / Decrement iterators until time contraints are met
if (start.seconds() != 0.0 || start.nanoseconds() != 0) {
while (begin != end && (*begin).time < start) {
if (bounded_start.seconds() != 0.0 || bounded_start.nanoseconds() != 0) {
while (begin != end && (*begin).time < bounded_start) {
++begin;
}
}
Expand Down Expand Up @@ -259,7 +269,7 @@ Snapshotter::Snapshotter(const rclcpp::NodeOptions & options)
buffers_.emplace(details, queue);
assert(res.second);

subscribe(details, queue);
subscribe(details, pair.second, queue);
}

// Now that subscriptions are setup, setup service servers for writing and pausing
Expand Down Expand Up @@ -370,8 +380,40 @@ void Snapshotter::parseOptionsFromParams()
}
} else {
options_.all_topics_ = true;
RCLCPP_INFO(get_logger(), "No topics list provided. Logging all topics.");
RCLCPP_WARN(get_logger(), "Logging all topics is very memory-intensive.");
RCLCPP_INFO(get_logger(), "No topics list provided. Recording everything except blacklisted topics and types.");
RCLCPP_INFO(get_logger(), "Logging all topics is very memory-intensive.");

std::vector<std::string> blacklist_topics{};

try {
blacklist_topics =
declare_parameter<std::vector<std::string>>("blacklist_topics", std::vector<std::string>{});
} catch (const rclcpp::ParameterTypeException & ex) {
if (std::string{ex.what()}.find("not set") == std::string::npos) {
RCLCPP_ERROR(get_logger(), "blacklist_topics must be an array of strings.");
throw ex;
}
}

for (auto topic_name : blacklist_topics) {
options_.blacklist_topics_.insert(topic_name);
}

std::vector<std::string> blacklist_types{};

try {
blacklist_types =
declare_parameter<std::vector<std::string>>("blacklist_types", std::vector<std::string>{});
} catch (const rclcpp::ParameterTypeException & ex) {
if (std::string{ex.what()}.find("not set") == std::string::npos) {
RCLCPP_ERROR(get_logger(), "blacklist_types must be an array of strings.");
throw ex;
}
}

for (auto type_name : blacklist_types) {
options_.blacklist_types_.insert(type_name);
}
}
}

Expand Down Expand Up @@ -425,9 +467,10 @@ void Snapshotter::topicCb(

void Snapshotter::subscribe(
const TopicDetails & topic_details,
const SnapshotterTopicOptions & topic_options,
std::shared_ptr<MessageQueue> queue)
{
RCLCPP_INFO(get_logger(), "Subscribing to %s", topic_details.name.c_str());
RCLCPP_DEBUG(get_logger(), "Subscribing to %s", topic_details.name.c_str());

auto opts = rclcpp::SubscriptionOptions{};
opts.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable;
Expand All @@ -436,7 +479,7 @@ void Snapshotter::subscribe(
auto sub = create_generic_subscription(
topic_details.name,
topic_details.type,
rclcpp::QoS{10},
topic_options.qos_,
std::bind(&Snapshotter::topicCb, this, _1, queue),
opts
);
Expand All @@ -454,12 +497,18 @@ bool Snapshotter::writeTopic(
// acquire lock for this queue
std::lock_guard l(message_queue.lock);

MessageQueue::range_t range = message_queue.rangeFromTimes(req->start_time, req->stop_time);
MessageQueue::range_t range = message_queue.rangeFromTimes(now(), req->start_time, req->stop_time);

SnapshotterTopicOptions topic_options = options_.topics_.at(topic_details);

YAML::Node offered_qos_profiles;
offered_qos_profiles.push_back(rosbag2_transport::Rosbag2QoS(topic_options.qos_));

rosbag2_storage::TopicMetadata tm;
tm.name = topic_details.name;
tm.type = topic_details.type;
tm.serialization_format = "cdr";
tm.offered_qos_profiles = YAML::Dump(offered_qos_profiles);

bag_writer.create_topic(tm);

Expand Down Expand Up @@ -519,15 +568,15 @@ void Snapshotter::triggerSnapshotCb(
}

// Ensure that state is updated when function exits, regardlesss of branch path / exception events
RCLCPP_SCOPE_EXIT(
auto on_exit = rcpputils::make_scope_exit([&]() {
// Clear buffers beacuase time gaps (skipped messages) may have occured while paused
std::unique_lock<std::shared_mutex> write_lock(state_lock_);
// Turn off writing flag and return recording to its state before writing
writing_ = false;
if (recording_prior) {
this->resume();
}
);
});

rosbag2_cpp::Writer bag_writer{};

Expand Down Expand Up @@ -638,29 +687,125 @@ void Snapshotter::pollTopics()

for (const auto & name_type : topic_names_and_types) {
if (name_type.second.size() < 1) {
RCLCPP_ERROR(get_logger(), "Subscribed topic has no associated type.");
return;
RCLCPP_ERROR(get_logger(), "Subscribed topic has no associated type, skipping.");
continue;
}

if (name_type.second.size() > 1) {
RCLCPP_ERROR(get_logger(), "Subscribed topic has more than one associated type.");
return;
RCLCPP_ERROR(get_logger(), "Subscribed topic has more than one associated type, skipping.");
continue;
}

TopicDetails details{};
details.name = name_type.first;
details.type = name_type.second[0];

if (options_.addTopic(details)) {
SnapshotterTopicOptions topic_options;
fixTopicOptions(topic_options);
auto queue = std::make_shared<MessageQueue>(topic_options, get_logger());
if (options_.blacklist_topics_.count(details.name)) {
RCLCPP_DEBUG(get_logger(), "Skipping blacklisted topic: %s", details.name.c_str());
continue;
}

if (options_.blacklist_types_.count(details.type)) {
RCLCPP_DEBUG(get_logger(), "Skipping blacklisted type: %s", details.type.c_str());
continue;
}

bool already_recording = !options_.addTopic(details);

if (already_recording) {
// Already recording this topic
continue;
}

const auto info = get_publishers_info_by_topic(details.name);

// Find all publishers for this topic
if (info.size() == 0) {
RCLCPP_DEBUG(get_logger(), "No publisher info found for topic: %s", details.name.c_str());
continue;
}

std::pair<buffers_t::iterator,
bool> res = buffers_.emplace(details, queue);
assert(res.second);
subscribe(details, queue);
// Find reliability of all publishers
std::vector<rclcpp::ReliabilityPolicy> reliabilities;
std::transform(
info.begin(), info.end(), std::back_inserter(reliabilities),
[](const rclcpp::TopicEndpointInfo & i) { return i.qos_profile().reliability(); });
if (reliabilities.size() == 0) {
RCLCPP_ERROR(get_logger(), "Skipping topic %s because QoS reliability could not be found for it", details.name.c_str());
continue;
}
if (!std::equal(reliabilities.begin() + 1, reliabilities.end(), reliabilities.begin())) {
RCLCPP_ERROR(get_logger(), "Skipping topic %s because it has multiple publishers with different QoS reliabilities", details.name.c_str());
continue;
}
auto reliability = reliabilities.at(0);

// Find durability of all publishers
std::vector<rclcpp::DurabilityPolicy> durabilities;
std::transform(
info.begin(), info.end(), std::back_inserter(durabilities),
[](const rclcpp::TopicEndpointInfo & i) { return i.qos_profile().durability(); });
if (durabilities.size() == 0) {
RCLCPP_ERROR(get_logger(), "Skipping topic %s because QoS durability could not be found for it", details.name.c_str());
continue;
}
auto durability = durabilities.at(0);
bool atleastOneTransientLocal = std::count(durabilities.begin(), durabilities.end(), rclcpp::DurabilityPolicy::TransientLocal) > 0;

// Use default duration limit, with no memory limit, for regular non-transient local topics
SnapshotterTopicOptions topic_options;

topic_options.qos_.reliability(reliability);
topic_options.qos_.durability(durability);
topic_options.duration_limit_ = options_.default_duration_limit_;
topic_options.memory_limit_ = SnapshotterTopicOptions::NO_MEMORY_LIMIT;

if (atleastOneTransientLocal) {
// If topic is transient local, we need to limit memory, but not duration
topic_options.qos_.durability(rclcpp::DurabilityPolicy::TransientLocal);
topic_options.duration_limit_ = SnapshotterTopicOptions::NO_DURATION_LIMIT;
topic_options.memory_limit_ = options_.default_memory_limit_;
}

auto queue = std::make_shared<MessageQueue>(topic_options, get_logger());

std::pair<buffers_t::iterator,
bool> res = buffers_.emplace(details, queue);
assert(res.second);
subscribe(details, topic_options, queue);

auto reliabilityToString = [](const rclcpp::ReliabilityPolicy & r) {
switch (r) {
case rclcpp::ReliabilityPolicy::Reliable:
return "Reliable";
case rclcpp::ReliabilityPolicy::BestEffort:
return "Best Effort";
case rclcpp::ReliabilityPolicy::SystemDefault:
return "System Default";
default:
return "Unknown";
}
};

auto durabilityToString = [](const rclcpp::DurabilityPolicy & d) {
switch (d) {
case rclcpp::DurabilityPolicy::TransientLocal:
return "Transient Local";
case rclcpp::DurabilityPolicy::Volatile:
return "Volatile";
case rclcpp::DurabilityPolicy::SystemDefault:
return "System Default";
default:
return "Unknown";
}
};

auto reliability_str = reliabilityToString(topic_options.qos_.reliability());
auto durability_str = durabilityToString(topic_options.qos_.durability());

RCLCPP_DEBUG(
get_logger(), "Subscribed to new topic: %s with type %s, with reliability %s and durability %s",
details.name.c_str(), details.type.c_str(), reliability_str, durability_str);
}
}

Expand Down Expand Up @@ -748,7 +893,7 @@ SnapshotterClient::SnapshotterClient(const rclcpp::NodeOptions & options)

if (opts.action_ == SnapshotterClientOptions::TRIGGER_WRITE && opts.topics_.size() == 0) {
RCLCPP_INFO(get_logger(), "No topics provided - logging all topics.");
RCLCPP_WARN(get_logger(), "Logging all topics is very memory-intensive.");
RCLCPP_INFO(get_logger(), "Logging all topics is very memory-intensive.");
}

setSnapshotterClientOptions(opts);
Expand All @@ -765,7 +910,7 @@ void SnapshotterClient::setSnapshotterClientOptions(const SnapshotterClientOptio
};
}

TriggerSnapshot::Request::SharedPtr req;
TriggerSnapshot::Request::SharedPtr req = std::make_shared<TriggerSnapshot::Request>();

for (const auto & topic : opts.topics_) {
req->topics.push_back(topic.asMessage());
Expand Down