Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,23 @@ def add_recorder_arguments(parser: ArgumentParser) -> None:
'Default: %(default)d, recording written in single bagfile and splitting '
'is disabled. If both splitting by size and duration are enabled, '
'the bag will split at whichever threshold is reached first.')
parser.add_argument(
'--max-splits', type=int, default=0,
help='Maximum number of splits to retain before deleting the oldest bagfile '
'(circular logging). Default: %(default)d, unlimited. '
'Requires --max-bag-size or --max-bag-duration to be set.')
parser.add_argument(
'--max-record-size', type=int, default=0,
help='Maximum total size in bytes for the entire recording across all splits. '
'Oldest bagfiles will be deleted when this limit is exceeded (circular logging). '
'Default: %(default)d, no limit on total recording size. '
'Requires --max-bag-size or --max-bag-duration to be set.')
parser.add_argument(
'--max-record-duration', type=int, default=0,
help='Maximum total duration in seconds for the entire recording across all splits. '
'Oldest bagfiles will be deleted when this limit is exceeded (circular logging). '
'Default: %(default)d, no limit on total recording duration. '
'Requires --max-bag-size or --max-bag-duration to be set.')
Comment on lines +167 to +178
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not against this PR. but i am not sure if those 2 optional arguments need to be supported by rosbag2... because i think those can be easily calculated by --max-splits option above? i think that using --max-splits with --max-bag-size or --max-bag-duration would be more straight-forward for user experience.

besides that,

  • (already) too many optional arguments for ros2 bag record... this generates more and more complication.
  • additional error handling needs to be implemented in the rosbag2 system. e.g) what if user accidentally set --max-record-size with smaller value than --max-bag-size.

parser.add_argument(
'--max-cache-size', type=int, default=100 * 1024 * 1024,
help='Maximum size (in bytes) of messages to hold in each buffer of cache. '
Expand Down Expand Up @@ -364,6 +381,9 @@ def main(self, *, args): # noqa: D102
max_bagfile_size=args.max_bag_size,
max_bagfile_duration=args.max_bag_duration,
max_cache_size=args.max_cache_size,
max_splits=args.max_splits,
max_record_size=args.max_record_size,
max_record_duration=args.max_record_duration,
storage_preset_profile=args.storage_preset_profile,
storage_config_uri=storage_config_file,
snapshot_mode=args.snapshot_mode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_msg_comp
}
}


TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_compression)
{
const uint64_t max_bagfile_size = 3;
Expand Down Expand Up @@ -628,6 +629,131 @@ TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_com
}
}

TEST_F(SequentialCompressionWriterTest,
circular_logging_limits_number_of_files_by_max_splits_with_file_compression)
{
const uint64_t max_bagfile_size = 3; // split frequently
const uint64_t max_splits = 3; // retain at most 3 files
const int message_count = 40;

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
0,
1,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
initializeWriter(compression_options);

tmp_dir_storage_options_.max_cache_size = 0;
tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size;
tmp_dir_storage_options_.max_splits = max_splits;
tmp_dir_storage_options_.max_record_size = 0;
tmp_dir_storage_options_.max_record_duration = 0;

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
for (int i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_.reset();

ASSERT_LE(intercepted_write_metadata_.files.size(), max_splits);
ASSERT_EQ(
intercepted_write_metadata_.files.size(),
intercepted_write_metadata_.relative_file_paths.size());
}

TEST_F(SequentialCompressionWriterTest,
circular_logging_limits_total_size_by_max_record_size_with_file_compression)
{
const uint64_t max_bagfile_size = 3; // split frequently
const uint64_t max_record_size = 5; // very small total size to force pruning
const int message_count = 40;

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
0,
1,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
initializeWriter(compression_options);

tmp_dir_storage_options_.max_cache_size = 0;
tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size;
tmp_dir_storage_options_.max_record_size = max_record_size; // enable size-based circular limit
tmp_dir_storage_options_.max_record_duration = 0;
tmp_dir_storage_options_.max_splits = 0;

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
for (int i = 0; i < message_count; ++i) {
writer_->write(message);
}
writer_.reset();

// Use message_count as unit proxy for size in metadata
uint64_t total_units = 0;
for (const auto & fi : intercepted_write_metadata_.files) {
total_units += static_cast<uint64_t>(fi.message_count);
}
ASSERT_LE(total_units, max_record_size);
}

TEST_F(SequentialCompressionWriterTest,
circular_logging_limits_total_duration_by_max_record_duration_with_file_compression)
{
const uint64_t max_bagfile_size = 3; // split frequently
const uint64_t max_record_duration_sec = 3; // small total duration
const int message_count = 20;

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
0,
1,
kDefaultCompressionQueueThreadsPriority
};

initializeFakeFileStorage();
initializeWriter(compression_options);

tmp_dir_storage_options_.max_cache_size = 0;
tmp_dir_storage_options_.max_bagfile_size = max_bagfile_size;
tmp_dir_storage_options_.max_record_size = 0;
tmp_dir_storage_options_.max_record_duration = max_record_duration_sec;
tmp_dir_storage_options_.max_splits = 0;

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

for (int i = 0; i < message_count; ++i) {
auto msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
msg->topic_name = "test_topic";
msg->recv_timestamp = static_cast<rcutils_time_point_value_t>(i) * 1000000000LL; // ns
writer_->write(msg);
}
writer_.reset();

uint64_t total_duration_ns = 0;
for (const auto & fi : intercepted_write_metadata_.files) {
total_duration_ns += static_cast<uint64_t>(fi.duration.count());
}
const uint64_t cap_ns = max_record_duration_sec * 1000000000ULL;
ASSERT_LE(total_duration_ns, cap_ns);
}

TEST_F(SequentialCompressionWriterTest, snapshot_writes_to_new_file_with_file_compression)
{
tmp_dir_storage_options_.max_bagfile_size = 0;
Expand Down
21 changes: 21 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,27 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter

rosbag2_storage::BagMetadata metadata_;

// Tracks the total size of all recorded files across splits
uint64_t total_recorded_size_ {0};

// Tracks the total duration of all recorded files across splits (in nanoseconds)
std::chrono::nanoseconds total_recorded_duration_ {0};

// Cached max record duration in nanoseconds (derived from seconds option at init)
uint64_t max_record_duration_ns_ {0};

// Tracks the next file index to use for sequential numbering (independent of file deletions)
uint64_t next_file_index_ {1};

// Gets the total size of the recording including current and all previous files
uint64_t get_total_record_size() const;

// Gets the total duration of the recording in seconds
uint64_t get_total_record_duration() const;

// Delete oldest bagfiles if total record size or duration exceeds limits
void delete_oldest_files_if_needed();

// Checks if the current recording bagfile needs to be split and rolled over to a new file.
bool should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time) const;
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/reindexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ void Reindexer::aggregate_metadata(
storage_options.storage_id,
storage_options.max_bagfile_size,
storage_options.max_bagfile_duration,
storage_options.max_splits,
storage_options.max_record_size,
storage_options.max_record_duration,
storage_options.max_cache_size,
storage_options.storage_config_uri
};
Expand Down
118 changes: 117 additions & 1 deletion rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,22 @@ void SequentialWriter::open(

init_metadata();
storage_->update_metadata(metadata_);
total_recorded_size_ = 0;
total_recorded_duration_ = std::chrono::nanoseconds(0);
// Cache max_record_duration in nanoseconds. Guard against overflow when converting seconds->ns.
if (storage_options_.max_record_duration == 0) {
max_record_duration_ns_ = 0ULL;
} else {
const uint64_t max_sec_without_overflow = UINT64_MAX / 1000000000ULL; // ~18446744073s
if (storage_options_.max_record_duration > max_sec_without_overflow) {
max_record_duration_ns_ = UINT64_MAX;
ROSBAG2_CPP_LOG_WARN(
"max_record_duration exceeds max representable ns; clamping to UINT64_MAX ns");
} else {
max_record_duration_ns_ = storage_options_.max_record_duration * 1000000000ULL;
}
}
next_file_index_ = 1; // First file is 0, next will be 1
is_open_ = true;
}

Expand Down Expand Up @@ -326,9 +342,25 @@ void SequentialWriter::switch_to_next_storage()
}

storage_->update_metadata(metadata_);

// Add current file size and duration to totals before switching
total_recorded_size_ += storage_->get_bagfile_size();
if (!metadata_.files.empty()) {
total_recorded_duration_ += metadata_.files.back().duration;
}

// Check for overflow: if next_file_index_ is 0, we've wrapped around (very unlikely but possible)
if (next_file_index_ == 0) {
ROSBAG2_CPP_LOG_WARN_STREAM(
"File index counter has overflowed (wrapped to 0). "
"This should not happen in practice, but continuing with index 0. "
"If circular logging is enabled, ensure old files are deleted to avoid conflicts.");
}

storage_options_.uri = format_storage_uri(
base_folder_,
metadata_.relative_file_paths.size());
next_file_index_);
next_file_index_++;
storage_ = storage_factory_->open_read_write(storage_options_);
if (!storage_) {
std::stringstream errmsg;
Expand Down Expand Up @@ -416,6 +448,8 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
if (!storage_options_.snapshot_mode && should_split_bagfile(message_timestamp)) {
split_bagfile();
metadata_.files.back().starting_time = message_timestamp;
// Delete oldest files if circular buffer limit exceeded (after split creates new file)
delete_oldest_files_if_needed();
}

metadata_.starting_time = std::min(metadata_.starting_time, message_timestamp);
Expand Down Expand Up @@ -499,6 +533,88 @@ bool SequentialWriter::should_split_bagfile(
return should_split;
}

uint64_t SequentialWriter::get_total_record_size() const
{
// Total = sum of all previous files + current file
return total_recorded_size_ + storage_->get_bagfile_size();
}

uint64_t SequentialWriter::get_total_record_duration() const
{
// Total = sum of all previous files' durations + current file duration (nanoseconds)
std::chrono::nanoseconds current_file_duration{0};
if (!metadata_.files.empty()) {
current_file_duration = metadata_.files.back().duration;
}
const auto total_duration = total_recorded_duration_ + current_file_duration;
return static_cast<uint64_t>(total_duration.count());
}

void SequentialWriter::delete_oldest_files_if_needed()
{
// Only delete if any circular buffer limit is set (size, duration, or split count)
// Note: This is only called after split_bagfile(), so max_bagfile_size is guaranteed to be set
if (storage_options_.max_record_size == 0 &&
storage_options_.max_record_duration == 0 &&
storage_options_.max_splits == 0)
{
return;
}

// Protect metadata access with mutex to prevent race conditions
std::lock_guard<std::mutex> lock(topics_info_mutex_);

// Delete oldest files until we're under all configured limits (if set)
// Note: We just split, so we have at least 2 files and the oldest is definitely not current
while (metadata_.files.size() > 1) {
// Check if we need to delete based on size limit
bool exceeds_size_limit = (storage_options_.max_record_size != 0 &&
get_total_record_size() > storage_options_.max_record_size);

// Check if we need to delete based on duration limit using cached ns value
bool exceeds_duration_limit = (max_record_duration_ns_ != 0 &&
get_total_record_duration() > max_record_duration_ns_);

// Check if we need to delete based on split-count limit
bool exceeds_split_limit = (storage_options_.max_splits != 0 &&
metadata_.files.size() > storage_options_.max_splits);

// If we're under all limits (or they're not set), stop deleting
if (!exceeds_size_limit && !exceeds_duration_limit && !exceeds_split_limit) {
break;
}

const auto & oldest_file = metadata_.files.front();
const auto file_path = fs::path(base_folder_) / oldest_file.path;

// Calculate duration contribution of this file for updating total duration
const auto file_duration = oldest_file.duration;
const auto file_duration_ns = file_duration.count();

// Delete file from filesystem
if (fs::exists(file_path)) {
const auto file_size = fs::file_size(file_path);
fs::remove(file_path);

total_recorded_size_ -= file_size;
total_recorded_duration_ -= file_duration;
ROSBAG2_CPP_LOG_WARN(
"Deleted oldest bagfile: %s (%lu bytes, %lu ns)",
oldest_file.path.c_str(), file_size, file_duration_ns);
}

// Remove from metadata
metadata_.relative_file_paths.erase(metadata_.relative_file_paths.begin());
metadata_.files.erase(metadata_.files.begin());
metadata_.starting_time = metadata_.files.front().starting_time;
}

// Update metadata after deletions
if (!metadata_.files.empty()) {
storage_->update_metadata(metadata_);
}
}

bool SequentialWriter::message_within_accepted_time_range(
const rcutils_time_point_value_t current_time) const
{
Expand Down
Loading