Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions rosbag2_transport/include/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ class Recorder : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
virtual ~Recorder();

/// @brief Start recording.
/// The record() method will return almost immediately and recording will happen in background.
/// \brief Start recording.
/// \details The record() method will return almost immediately and recording will happen in
/// background.
/// \param uri If provided, it will override the storage_options.uri provided during construction.
ROSBAG2_TRANSPORT_PUBLIC
void record();
void record(const std::string & uri = "");

/// @brief Add a new channel (topic) to the rosbag2 writer to be recorded.
/// \details This is a direct Recorder API equivalent to the rosbag2_cpp::Writer::add_topic().
Expand Down Expand Up @@ -214,8 +216,8 @@ class Recorder : public rclcpp::Node
uint64_t get_total_num_messages_lost_in_transport() const;

/// @brief Stopping recording.
/// @details The stop() is opposite to the record() operation. It will stop recording, dump
/// all buffers to the disk and close writer. The record() can be called again after stop().
/// @details The stop() is opposite to the record(uri) operation. It will stop recording, dump
/// all buffers to the disk and close writer. The record(uri) can be called again after stop().
ROSBAG2_TRANSPORT_PUBLIC
void stop();

Expand Down
49 changes: 40 additions & 9 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ class RecorderImpl

~RecorderImpl();

/// @brief Start recording.
/// The record() method will return almost immediately and recording will happen in background.
void record();
/// \brief Start recording.
/// \details The record(uri) method will return almost immediately and recording will happen in
/// background.
/// \param uri If provided, it will override the storage_options.uri provided during construction.
void record(const std::string & uri = "");

/// @brief Add a new channel (topic) to the rosbag2 writer to be recorded.
/// \details This is a direct Recorder API equivalent to the rosbag2_cpp::Writer::add_topic().
Expand Down Expand Up @@ -156,7 +158,7 @@ class RecorderImpl
uint64_t get_total_num_messages_lost_in_transport() const;

/// @brief Stopping recording and closing writer.
/// The record() can be called again after stop().
/// The record(uri) can be called again after stop().
void stop();

/// Get a const reference to the underlying rosbag2 writer.
Expand Down Expand Up @@ -348,15 +350,19 @@ void RecorderImpl::stop()
}
}

void RecorderImpl::record()
void RecorderImpl::record(const std::string & uri)
{
std::lock_guard<std::mutex> state_lock(start_stop_transition_mutex_);
if (in_recording_.exchange(true)) {
RCLCPP_WARN_STREAM(
node->get_logger(),
"Called Recorder::record() while already in recording, dismissing request.");
"Called Recorder::record(uri) while already in recording, dismissing request.");
return;
}
if (!uri.empty()) {
storage_options_.uri = uri;
}
RCLCPP_INFO(node->get_logger(), "Starting recording to '%s'", storage_options_.uri.c_str());
paused_ = record_options_.start_paused;
topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides;
// Check serialization format options
Expand All @@ -383,6 +389,31 @@ void RecorderImpl::record()
subscriptions_.clear();
event_notifier_->reset_total_num_messages_lost_in_transport();
event_notifier_->reset_total_num_messages_lost_in_recorder();

// Check if storage_options.uri already exists and try to add '(n)' postfix
namespace fs = std::filesystem;
fs::path storage_path(storage_options_.uri);
if (fs::is_directory(storage_path)) {
RCLCPP_WARN_STREAM(node->get_logger(),
"Bag directory '" << storage_path.c_str() << "' already exists.");
for (size_t i = 1U; i < std::numeric_limits<size_t>::max(); i++) {
fs::path new_path = storage_path;
new_path += "(" + std::to_string(i) + ")";
if (!fs::exists(new_path)) {
RCLCPP_WARN_STREAM(node->get_logger(),
"Changing bag directory to '" << new_path.c_str() << "'.");
storage_options_.uri = new_path.generic_string();
storage_path = new_path;
break;
}
}
}
if (fs::is_directory(storage_path)) {
throw std::runtime_error{
"Failed to derive non-existent directory for the new Rosbag2 recording. "
"Please specify non existent uri explicitly."};
}

writer_->open(
storage_options_,
{record_options_.input_serialization_format, record_options_.output_serialization_format});
Expand Down Expand Up @@ -513,7 +544,7 @@ void RecorderImpl::start_discovery()
// Get graph event to ensure to start GrapListener if not already started and register event,
// before we're starting discovery thread.
// This is a workaround to split initialization and runtime phases to avoid race condition when
// a new publisher appeared after we finish start_discovery() and/or Recorder::record(),
// a new publisher appeared after we finish start_discovery() and/or Recorder::record(uri),
// but before we really start topics_discovery() thread.
discovery_graph_event_ = node->get_graph_event();
discovery_future_ =
Expand Down Expand Up @@ -993,9 +1024,9 @@ Recorder::Recorder(

Recorder::~Recorder() = default;

void Recorder::record()
void Recorder::record(const std::string & uri)
{
pimpl_->record();
pimpl_->record(uri);
}

void Recorder::add_channel(
Expand Down
36 changes: 36 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@

#include <gmock/gmock.h>

#include <filesystem>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include <rosbag2_storage/ros_helper.hpp>

#include "mock_recorder.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rcpputils/scope_exit.hpp"

#include "rosbag2_test_common/publication_manager.hpp"
#include "rosbag2_test_common/wait_for.hpp"
#include "rosbag2_test_common/temporary_directory_fixture.hpp"

#include "rosbag2_transport/recorder.hpp"

Expand All @@ -34,6 +38,11 @@

#include "rosbag2_storage/qos.hpp"
#include "record_integration_fixture.hpp"
#include "rosbag2_transport/reader_writer_factory.hpp"

using namespace ::testing; // NOLINT
using rosbag2_test_common::TemporaryDirectoryFixture;
namespace fs = std::filesystem;

TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are_recorded)
{
Expand Down Expand Up @@ -126,6 +135,33 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are
}
}

TEST_F(TemporaryDirectoryFixture, can_record_again_after_stop_with_real_storage) {
rclcpp::init(0, nullptr);
auto cleanup_process_handle = rcpputils::make_scope_exit([&]() {rclcpp::shutdown();});
std::string test_topic = "/can_record_again_after_stop_topic";
rosbag2_storage::StorageOptions storage_options{};
storage_options.uri = (fs::path(temporary_dir_path_) / "start_stop_again").generic_string();

rosbag2_transport::RecordOptions record_options{};

auto writer = rosbag2_transport::ReaderWriterFactory::make_writer(record_options);
{
auto recorder = std::make_shared<MockRecorder>(
std::move(writer), storage_options, record_options);

EXPECT_NO_THROW(recorder->record());
fs::path storage_path(storage_options.uri);
EXPECT_TRUE(fs::is_directory(storage_path));

EXPECT_NO_THROW(recorder->stop());
EXPECT_NO_THROW(recorder->record());
storage_path = recorder->get_storage_options().uri;
EXPECT_TRUE(fs::is_directory(storage_path));
std::string expected_path_str = storage_options.uri + "(1)";
EXPECT_EQ(storage_path.generic_string(), expected_path_str);
}
}

TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop)
{
GTEST_SKIP() << "Skipping test `can_record_again_after_stop` in rosbag2_transport, until "
Expand Down
Loading