Skip to content

Commit e0beab9

Browse files
Fix ComQueue dropping oldest Fw::Buffer without returning ownership (#5014)
* Fix ComQueue dropping oldest Fw::Buffer without returning ownership When a buffer queue configured with QUEUE_DROP_OLDEST overflows, the oldest queued Fw::Buffer was silently discarded by Queue::enqueue via CircularBuffer::rotate without returning ownership through bufferReturnOut. This leaked buffer-pool entries, eventually starving dependent communication or telemetry paths. Fix: - Add optional 'discarded' output parameter to Types::Queue::enqueue() that captures the about-to-be-dropped message data via peek before the rotate. - In ComQueue::enqueue(), pass an Fw::Buffer-sized output buffer for buffer queues and invoke bufferReturnOut_out() when DROP_OLDEST fires. Closes #5011 Co-Authored-By: michael.d.starch <michael.d.starch@jpl.nasa.gov> * Add discarded_size assert to Queue::enqueue, use proper Fw::Buffer serialization, add UTs - Queue::enqueue: added discarded_size parameter with FW_ASSERT validating the output buffer is large enough when discarded != nullptr - ComQueue: switched buffer queues from raw reinterpret_cast<U8*>(&buffer) to proper serializeTo/deserializeFrom using Fw::Buffer::SERIALIZED_SIZE. This affects buffQueueIn_handler (serialize before enqueue), the discard path in enqueue(), processQueue() (deserialize after dequeue), and drainQueue() (deserialize after dequeue). - Added 4 Queue UTs: DropOldestDiscardedOutput, DropOldestNullDiscarded, DropNewestIgnoresDiscarded, LIFODropOldestDiscardedOutput - Added 2 ComQueue UTs: ComQueueDropOldestNoBufferReturn (nullptr discarded path), BufferQueueFlushAfterDropOldest (drainQueue deserialization) All 14 Queue UTs and 25 ComQueue UTs pass. Co-Authored-By: michael.d.starch <michael.d.starch@jpl.nasa.gov> * Run clang-format on changed files Co-Authored-By: michael.d.starch <michael.d.starch@jpl.nasa.gov> * Revert Queue changes, fix ComQueue drop-oldest with pre-emptive dequeue Reverted all Utils/Types/Queue changes (discarded parameter, UTs) per review feedback — the fix now lives entirely in ComQueue. ComQueue::enqueue() now checks if a buffer queue is full with DROP_OLDEST before calling Queue::enqueue(). When full, it dequeues the oldest entry first, returns buffer ownership via bufferReturnOut, then enqueues the new message into the now-available slot. This prevents buffer-pool leaks without modifying the Queue API. Also reverted the Fw::Buffer serialization changes (to be done in a separate PR) and removed the added ComQueue/Queue UTs. Updated existing testBufferQueueDropOldestMode assertion from 2 to 3 buffer returns to reflect the fix. All 10 Queue UTs and 22 ComQueue UTs pass. Co-Authored-By: michael.d.starch <michael.d.starch@jpl.nasa.gov> * Add Queue::popFront() to fix LIFO+DROP_OLDEST pre-emptive dequeue Queue::dequeue() respects queue mode (LIFO removes from back), but DROP_OLDEST always discards the front entry. The pre-emptive dequeue in ComQueue was using dequeue(), which would remove the wrong entry for LIFO+DROP_OLDEST queues. Added Queue::popFront() — always peeks offset 0 then rotates, matching the rotate-based removal that Queue::enqueue() uses for DROP_OLDEST. ComQueue::enqueue() now calls popFront() instead of dequeue() for the pre-emptive overflow path. Co-Authored-By: michael.d.starch <michael.d.starch@jpl.nasa.gov> * Rename deqStatus to dequeueStatus to fix spelling abbreviation Co-Authored-By: michael.d.starch <michael.d.starch@jpl.nasa.gov> --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent fed7efe commit e0beab9

4 files changed

Lines changed: 55 additions & 5 deletions

File tree

Svc/ComQueue/ComQueue.cpp

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,8 +315,34 @@ bool ComQueue::enqueue(const FwIndexType queueNum, QueueType queueType, const U8
315315
static_cast<FwIndexType>(queueNum - ((queueType == QueueType::COM_QUEUE) ? 0 : COM_PORT_COUNT));
316316
FW_ASSERT(expectedSize == size, static_cast<FwAssertArgType>(size), static_cast<FwAssertArgType>(expectedSize));
317317
FW_ASSERT(portNum >= 0, static_cast<FwAssertArgType>(portNum));
318+
319+
// For buffer queues with DROP_OLDEST, check if the queue is full before enqueuing.
320+
// If full, dequeue the oldest entry first so we can return buffer ownership before
321+
// Queue::enqueue() silently discards it via rotate. This prevents buffer-pool leaks.
322+
bool preEmptiveOverflow = false;
323+
if (queueType == QueueType::BUFFER_QUEUE) {
324+
Types::Queue& queue = this->m_queues[queueNum];
325+
for (FwIndexType i = 0; i < TOTAL_PORT_COUNT; i++) {
326+
if (this->m_prioritizedList[i].index == queueNum &&
327+
this->m_prioritizedList[i].overflowMode == Types::QUEUE_DROP_OLDEST &&
328+
queue.getQueueSize() >= this->m_prioritizedList[i].depth) {
329+
// Queue is full and will drop oldest; remove the front entry to return ownership.
330+
// popFront() always removes from the front (oldest) regardless of queue mode,
331+
// matching the rotate-based removal that Queue::enqueue() uses for DROP_OLDEST.
332+
Fw::Buffer droppedBuffer;
333+
Fw::SerializeStatus dequeueStatus =
334+
queue.popFront(reinterpret_cast<U8*>(&droppedBuffer), sizeof(droppedBuffer));
335+
FW_ASSERT(dequeueStatus == Fw::FW_SERIALIZE_OK, static_cast<FwAssertArgType>(dequeueStatus));
336+
this->bufferReturnOut_out(portNum, droppedBuffer);
337+
preEmptiveOverflow = true;
338+
break;
339+
}
340+
}
341+
}
342+
318343
Fw::SerializeStatus status = this->m_queues[queueNum].enqueue(data, size);
319-
if (status == Fw::FW_SERIALIZE_NO_ROOM_LEFT || status == Fw::FW_SERIALIZE_DISCARDED_EXISTING) {
344+
if (preEmptiveOverflow || status == Fw::FW_SERIALIZE_NO_ROOM_LEFT ||
345+
status == Fw::FW_SERIALIZE_DISCARDED_EXISTING) {
320346
if (!this->m_throttle[queueNum]) {
321347
this->log_WARNING_HI_QueueOverflow(queueType, portNum);
322348
this->m_throttle[queueNum] = true;

Svc/ComQueue/test/ut/ComQueueTester.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -829,10 +829,8 @@ void ComQueueTester ::testBufferQueueDropOldestMode() {
829829
// Verify we only have 2 messages
830830
ASSERT_from_dataOut_SIZE(2);
831831

832-
// Verify buffers were returned (2 sent + dropped buffer1)
833-
// Note: When DROP_OLDEST happens, the dropped buffer should NOT be returned
834-
// since it was consumed by the queue
835-
ASSERT_from_bufferReturnOut_SIZE(2);
832+
// Verify buffers were returned (2 sent + 1 dropped buffer1 returned for ownership)
833+
ASSERT_from_bufferReturnOut_SIZE(3);
836834

837835
component.cleanup();
838836
}

Utils/Types/Queue.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,18 @@ Fw::SerializeStatus Queue::dequeue(U8* const message, const FwSizeType size) {
9090
}
9191
}
9292

93+
Fw::SerializeStatus Queue::popFront(U8* const message, const FwSizeType size) {
94+
FW_ASSERT(m_message_size > 0);
95+
FW_ASSERT(m_message_size <= size, static_cast<FwAssertArgType>(size), static_cast<FwAssertArgType>(m_message_size));
96+
FW_ASSERT(message != nullptr);
97+
// Always remove from the front (oldest), regardless of queue mode
98+
Fw::SerializeStatus result = m_internal.peek(message, m_message_size, 0);
99+
if (result != Fw::FW_SERIALIZE_OK) {
100+
return result;
101+
}
102+
return m_internal.rotate(m_message_size);
103+
}
104+
93105
FwSizeType Queue::get_high_water_mark() const {
94106
FW_ASSERT(m_message_size > 0, static_cast<FwAssertArgType>(m_message_size));
95107
return m_internal.get_high_water_mark() / m_message_size;

Utils/Types/Queue.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,20 @@ class Queue {
101101
*/
102102
Fw::SerializeStatus dequeue(U8* const message, const FwSizeType size);
103103

104+
/**
105+
* \brief removes and returns the oldest (front) message regardless of queue mode
106+
*
107+
* Unlike dequeue(), which respects the queue mode (FIFO vs LIFO), this method always
108+
* removes from the front of the queue. This is needed when callers must capture the
109+
* oldest entry before it is discarded (e.g. returning buffer ownership on DROP_OLDEST
110+
* overflow).
111+
*
112+
* \param message: buffer of at least m_message_size bytes to receive the oldest message
113+
* \param size: size of the buffer being supplied
114+
* \return: Fw::SERIALIZE_OK on success, something else on failure
115+
*/
116+
Fw::SerializeStatus popFront(U8* const message, const FwSizeType size);
117+
104118
/**
105119
* Return the largest tracked allocated size
106120
*/

0 commit comments

Comments
 (0)