Skip to content

Conversation

@dorjesinpo
Copy link
Collaborator

When RelayQueueEngine calls catchUp for a single App, the PushStream may have a gap resulting in the App first element being behind common iterator.
A solution is to introduce a seqeunce number (only for PushStream messages)
For RootQueueEngine, catchUp still can rely on comparing current guid against the common iterator.
Since both engines use the same mqbi::StorageIterator interface, the duality is expressed as bdlb::Variant<bsls::Types::Uint64, bmqt::MessageGUID>

@dorjesinpo dorjesinpo requested a review from a team as a code owner December 11, 2025 01:41
@dorjesinpo dorjesinpo added the bug Something isn't working label Dec 11, 2025
@dorjesinpo dorjesinpo changed the title stop catchup before reaching common iterator Fix[MQB]: stop catchup before reaching common iterator Dec 11, 2025
@678098 678098 self-requested a review December 15, 2025 15:14
bsls::TimeInterval* delay,
mqbi::StorageIterator* reader,
mqbi::StorageIterator* start,
const bdlb::Variant<bsls::Types::Uint64, bmqt::MessageGUID>& stop)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed with @dorjesinpo, I will try to find a way to provide stop without building a Variant object. Template arg might work well, however, there might be cleaner ways to refactor the code

Copy link
Collaborator

@678098 678098 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counter thing introduces a multi-threading bug

@dorjesinpo dorjesinpo force-pushed the fix/stop-catchup branch 3 times, most recently from 77e816f to 75ad47e Compare December 18, 2025 14:46
Copy link

@bmq-oss-ci bmq-oss-ci bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build 3170 of commit 75ad47e has completed with FAILURE

// cannot erase the GUID because of the d_iterator

if (d_iterator->second.numElements() == 0) {
if (message().numElements() == 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message() accessor now checks that !atEnd(), the same check happens at line 124. It means that the same thing is being checked twice during removeCurrentElement call

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. We can remove the assert in message() if we want to. Everywhere it is called, there is the same (duplicate) check prior to the call. But, it looks safer to keep the assert. If we want optimization, we can do something about asserts maybe.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const PushStream::Message& message() const is a protected API, we fully control how to use it from within this class. All its usages are visible and easy to review.
I think it's worth to add checks where we use the class from external code but it's not worth to check any action in internal API, if we have trust in object's state due to the checks that were already performed.

clearCache();

if (d_iterator->second.numElements() == 0) {
if (message().numElements() == 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!atEnd is checked twice now during advance call

if (d_currentOrdinal > appOrdinal) {
d_currentOrdinal = 0;
d_currentElement = d_iterator->second.front();
d_currentElement = message().d_appMessages.front();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!atEnd is checked twice now in this function

Comment on lines 180 to 181
mqbblp::PushStreamIterator* d_start;
mqbblp::PushStreamIterator* d_stop;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
mqbblp::PushStreamIterator* d_start;
mqbblp::PushStreamIterator* d_stop;
mqbblp::PushStreamIterator* d_start_p;
mqbblp::PushStreamIterator* d_stop_p;

Comment on lines 81 to 82
mqbi::StorageIterator* d_start;
mqbi::StorageIterator* d_stop;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
mqbi::StorageIterator* d_start;
mqbi::StorageIterator* d_stop;
mqbi::StorageIterator* d_start_p;
mqbi::StorageIterator* d_stop_p;

, d_stop(stop)
, d_doAdvance(false)
{
// NOTHING
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// NOTHING
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_start_p);
BSLS_ASSERT_SAFE(d_end_p);

Signed-off-by: dorjesinpo <[email protected]>
Copy link

@bmq-oss-ci bmq-oss-ci bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build 3175 of commit c080fc3 has completed with FAILURE

@678098 678098 self-requested a review January 6, 2026 19:41
Copy link
Collaborator

@678098 678098 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple more comments

/// Return number of Elements in the list
unsigned int numElements() const;

bsls::Types::Uint64 sequenceNumber() const;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to keep data signed and accessor unsigned? Let's keep both unsigned?

// cannot erase the GUID because of the d_iterator

if (d_iterator->second.numElements() == 0) {
if (message().numElements() == 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const PushStream::Message& message() const is a protected API, we fully control how to use it from within this class. All its usages are visible and easy to review.
I think it's worth to add checks where we use the class from external code but it's not worth to check any action in internal API, if we have trust in object's state due to the checks that were already performed.

return 0;
}

if (!d_stop_p->atEnd()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only advance d_start_p. This means that the result of d_stop_p->atEnd() never changes unless we pass a pointer to the same mqbi::StorageIterator:

VirtualIterator iter(start_p, start_p);

In this case we will return 0 on line 206 and never reach the check if (!d_stop_p->atEnd()) {

It is possible to compute d_stop_p->sequenceNumber() just once in constructor:

    const bsls::Types::Uint64 d_stopSequenceNumber;

    VirtualIterator(mqbblp::PushStreamIterator* start,
                    mqbblp::PushStreamIterator* stop)
    : d_start_p(start)
    , d_stopSequenceNumber((stop && !stop->atEnd()) ? stop->sequenceNumber() : bsl::numeric_limits<bsls::Types::Uint64>::max())
    , d_doAdvance(false)
    {
        // PRECONDITIONS
        BSLS_ASSERT_SAFE(d_start_p);
    }

And simplify the condition:

if (d_stopSequenceNumber <= d_start_p->sequenceNumber()) {
    return 0;
}

Note that when we call next() multiple times, we always check 2 if conditions, after this change we will only check 1.

}

if (!d_stop_p->atEnd()) {
if (d_start_p->guid() == d_stop_p->guid()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. When we call next() multiple times we always check these 2 conditions. We can just cache MessageGuid and check one:

if (d_start_p->guid() == d_stopGuid) {
    return 0;
}

// this prevents file set from closing possibly for a very long time.
// Make sure to invalidate any cached data within this iterator after use.
// TODO: refactor iterators to remove cached data.
d_start_p->clearCache();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to do the same for RelayQueueEngine::VirtualIterator?

@678098 678098 assigned dorjesinpo and unassigned 678098 Jan 6, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants