Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,14 @@ class ROSBAG2_CPP_PUBLIC CircularMessageCache
~CircularMessageCache() override;

/// Puts msg into circular buffer, replacing the oldest msg when buffer is full
<<<<<<< HEAD
void push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg) override;
=======
/// \return True if message was successfully pushed, otherwise false.
/// NOTE: Unless message is null or too large for the buffer, this will always return true
/// since the circular buffer by design drops old messages when the buffer is full.
bool push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg) override;
>>>>>>> 572f98e (Check for nullptrs when pushing new messages to the message cache (#2219))

/// Get current buffer to consume.
/// Locks consumer buffer until release_consumer_buffer is called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer
explicit MessageCacheCircularBuffer(size_t max_cache_size);

/**
* If buffer size has some space left, we push the message regardless of its size,
* but if this results in exceeding buffer size, we begin dropping old messages.
* \brief Pushes a SerializedBagMessage into the cache buffer.
* \details If buffer size has some space left, we push the message regardless of its size,
* but if this results in exceeding buffer size, we begin dropping old messages.
* \param msg SerializedBagMessage to add to the buffer.
* \return True if message was successfully pushed. Returns false if msg is null or if msg size
* exceeds max buffer size.
*/
bool push(CacheBufferInterface::buffer_element_t msg) override;

Expand Down
4 changes: 4 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ CircularMessageCache::~CircularMessageCache()
void CircularMessageCache::push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg)
{
std::lock_guard<std::mutex> cache_lock(producer_buffer_mutex_);
<<<<<<< HEAD
producer_buffer_->push(msg);
=======
return producer_buffer_->push(msg);
>>>>>>> 572f98e (Check for nullptrs when pushing new messages to the message cache (#2219))
}

std::shared_ptr<CacheBufferInterface> CircularMessageCache::get_consumer_buffer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@ MessageCacheCircularBuffer::MessageCacheCircularBuffer(size_t max_cache_size)

bool MessageCacheCircularBuffer::push(CacheBufferInterface::buffer_element_t msg)
{
if (!msg || !msg->serialized_data) {
ROSBAG2_CPP_LOG_ERROR("Attempted to push null message into circular buffer. Dropping message!");
return false;
}

// Drop message if it exceeds the buffer size
if (msg->serialized_data->buffer_length > max_bytes_size_) {
ROSBAG2_CPP_LOG_WARN_STREAM("Last message exceeds snapshot buffer size. Dropping message!");
ROSBAG2_CPP_LOG_WARN("Last message exceeds snapshot buffer size. Dropping message!");
return false;
}

Expand Down
9 changes: 9 additions & 0 deletions rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,12 @@ TEST_F(CircularMessageCacheTest, circular_message_cache_ensure_empty) {
EXPECT_THAT(circular_message_cache->get_consumer_buffer()->size(), Eq(0u));
circular_message_cache->release_consumer_buffer();
}

TEST_F(CircularMessageCacheTest, circular_message_cache_rejects_null_message) {
auto circular_message_cache_ = std::make_shared<rosbag2_cpp::cache::CircularMessageCache>(
cache_size_);

bool result = true;
ASSERT_NO_THROW(result = circular_message_cache_->push(nullptr));
EXPECT_FALSE(result);
}
Loading