Skip to content

Fix[BMQ]: unsafe schemalearner use #618

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions src/groups/bmq/bmqa/bmqa_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <bmqimp_queue.h>
#include <bmqp_event.h>
#include <bmqp_eventutil.h>
#include <bmqp_messageproperties.h>
#include <bmqp_protocolutil.h>
#include <bmqp_queueid.h>
#include <bmqt_resultcode.h>
Expand Down Expand Up @@ -497,10 +496,11 @@ int Message::loadProperties(MessageProperties* buffer) const
BSLS_ASSERT_SAFE(isInitialized());
BSLS_ASSERT_SAFE(buffer);

bmqp::MessageProperties** propertiesImpl =
reinterpret_cast<bmqp::MessageProperties**>(buffer);
bmqp::MessageProperties* propertiesImpl =
*reinterpret_cast<bmqp::MessageProperties**>(buffer);

const bmqp::Event& rawEvent = d_impl.d_event_p->rawEvent();
bmqimp::Event* event = d_impl.d_event_p;
const bmqp::Event& rawEvent = event->rawEvent();
int rc = -1;

if (rawEvent.isPushEvent()) {
Expand All @@ -509,32 +509,30 @@ int Message::loadProperties(MessageProperties* buffer) const
d_impl.d_queueId);

if (queue->id() == bmqimp::Queue::k_INVALID_QUEUE_ID) {
queue = d_impl.d_event_p->lookupQueue();
queue = event->lookupQueue();
}
BSLS_ASSERT_SAFE(queue);

bdlbb::Blob propertiesBlob;
const bmqp::PushMessageIterator& it =
*d_impl.d_event_p->pushMessageIterator();
const bmqp::PushMessageIterator& it = *event->pushMessageIterator();
it.loadMessageProperties(&propertiesBlob);

bmqp::MessagePropertiesInfo input(it.header());

rc = queue->schemaLearner().read(queue->schemaLearnerContext(),
*propertiesImpl,
input,
propertiesBlob);
rc = propertiesImpl->streamIn(propertiesBlob,
input,
d_impl.d_schema_sp);

// Forcibly load all properties.
// REVISIT; this can be removed once MPs support dynamic modification
if (rc == 0) {
rc = 1000 *
(*propertiesImpl)->loadProperties(false, input.isExtended());
propertiesImpl->loadProperties(false, input.isExtended());
}
}
else if (rawEvent.isPutEvent()) {
rc = d_impl.d_event_p->putMessageIterator()->loadMessageProperties(
*propertiesImpl);
rc = event->putMessageIterator()->loadMessageProperties(
propertiesImpl);
// Schema learning for PUSHs only, not for PUTs assuming 'Message'
// builds PUTs and receives PUSHs.
}
Expand Down
8 changes: 8 additions & 0 deletions src/groups/bmq/bmqa/bmqa_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ namespace bmqimp {
class Event;
}

namespace bmqp {
class MessageProperties_Schema;
}

namespace bmqa {

// FORWARD DECLARATION
Expand All @@ -100,6 +104,8 @@ class MessageProperties;
/// `correlationId`), then they should be reset in
/// `bmqa::MessageIterator.nextMessage()`.
struct MessageImpl {
// FORWARD DECLARATION

// PUBLIC DATA

/// Pointer to the Event this message is associated with
Expand All @@ -117,6 +123,8 @@ struct MessageImpl {
/// SubscriptionHandle this message is associated with
bmqt::SubscriptionHandle d_subscriptionHandle;

bsl::shared_ptr<const bmqp::MessageProperties_Schema> d_schema_sp;

#ifdef BMQ_ENABLE_MSG_GROUPID
/// Optional GroupId this message is associated with
bsl::string d_groupId;
Expand Down
12 changes: 6 additions & 6 deletions src/groups/bmq/bmqa/bmqa_message.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ static void test2_validPushMessagePrint()
true);
implPtr->configureAsMessageEvent(bmqpEvent);

implPtr->addCorrelationId(bmqt::CorrelationId());
implPtr->addContext(bmqt::CorrelationId());

bmqa::MessageEvent pushMsgEvt = event.messageEvent();
bmqa::MessageIterator mIter = pushMsgEvt.messageIterator();
Expand Down Expand Up @@ -329,7 +329,7 @@ static void test3_messageProperties()
true);

implPtr->configureAsMessageEvent(bmqpEvent);
implPtr->addCorrelationId(bmqt::CorrelationId());
implPtr->addContext(bmqt::CorrelationId());

bmqa::MessageEvent pushMsgEvt = event.messageEvent();
bmqa::MessageIterator mIter = pushMsgEvt.messageIterator();
Expand Down Expand Up @@ -493,7 +493,7 @@ static void test4_subscriptionHandle()
implPtr->configureAsMessageEvent(bmqpEvent);

implPtr->insertQueue(sId, queueSp);
implPtr->addCorrelationId(cId, sId);
implPtr->addContext(cId, sId);

bmqa::MessageEvent pushMsgEvt = event.messageEvent();
bmqa::MessageIterator mIter = pushMsgEvt.messageIterator();
Expand Down Expand Up @@ -547,7 +547,7 @@ static void test4_subscriptionHandle()
implPtr->configureAsMessageEvent(bmqpEvent);

implPtr->insertQueue(defaultSubscriptionId, queueSp);
implPtr->addCorrelationId(emptyCorrelationId, defaultSubscriptionId);
implPtr->addContext(emptyCorrelationId, defaultSubscriptionId);

bmqa::MessageEvent pushMsgEvt = event.messageEvent();
bmqa::MessageIterator mIter = pushMsgEvt.messageIterator();
Expand Down Expand Up @@ -592,7 +592,7 @@ static void test4_subscriptionHandle()
implPtr->configureAsMessageEvent(bmqpEvent);

implPtr->insertQueue(queueSp);
implPtr->addCorrelationId(cId);
implPtr->addContext(cId);

bmqa::MessageEvent putMsgEvt = event.messageEvent();
bmqa::MessageIterator mIter = putMsgEvt.messageIterator();
Expand Down Expand Up @@ -629,7 +629,7 @@ static void test4_subscriptionHandle()
implPtr->configureAsMessageEvent(bmqpEvent);

implPtr->insertQueue(queueSp);
implPtr->addCorrelationId(cId);
implPtr->addContext(cId);

bmqa::MessageEvent ackMsgEvt = event.messageEvent();
bmqa::MessageIterator mIter = ackMsgEvt.messageIterator();
Expand Down
4 changes: 2 additions & 2 deletions src/groups/bmq/bmqa/bmqa_messageevent.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ static void test2_ackMesageIteratorTest()
for (bsl::vector<AckData>::const_iterator i = messages.begin();
i != messages.end();
++i) {
eventImpl->addCorrelationId(bmqt::CorrelationId(i->d_corrId));
eventImpl->addContext(bmqt::CorrelationId(i->d_corrId));
}

bmqa::MessageEvent event;
Expand Down Expand Up @@ -266,7 +266,7 @@ static void test3_putMessageIteratorTest()
// 'bmqa::MessageEventBuilder::packMessage' so that the message iterator
// can access the correlationId for each message.
for (size_t i = 0; i < k_NUM_MSGS; ++i) {
eventImpl->addCorrelationId(bmqt::CorrelationId(i));
eventImpl->addContext(bmqt::CorrelationId(i));
}

bmqa::MessageEvent event;
Expand Down
16 changes: 11 additions & 5 deletions src/groups/bmq/bmqa/bmqa_messageiterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

// bmqa_messageiterator.cpp -*-C++-*-
#include <bmqa_messageiterator.h>

#include <bmqp_messageproperties.h>
#include <bmqscm_version.h>
// BMQ
#include <bmqa_queueid.h>
Expand Down Expand Up @@ -48,6 +48,7 @@ bool MessageIterator::nextMessage()
msgImpl.d_queueId = bmqa::QueueId();
msgImpl.d_correlationId.makeUnset();
msgImpl.d_subscriptionHandle = bmqt::SubscriptionHandle();
msgImpl.d_schema_sp.reset();

int rc = -1;

Expand Down Expand Up @@ -100,15 +101,20 @@ bool MessageIterator::nextMessage()
++d_impl.d_messageIndex;
BSLS_ASSERT_SAFE(d_impl.d_messageIndex <
d_impl.d_event_p->numCorrrelationIds());
msgImpl.d_correlationId = d_impl.d_event_p->correlationId(
d_impl.d_messageIndex);

const bmqimp::Event::MessageContext& context =
d_impl.d_event_p->context(d_impl.d_messageIndex);

msgImpl.d_correlationId = context.d_correlationId;

if (d_impl.d_event_p->rawEvent().isPushEvent()) {
const unsigned int subscriptionId =
d_impl.d_event_p->subscriptionId(d_impl.d_messageIndex);
const unsigned int subscriptionId = context.d_subscriptionHandleId;

msgImpl.d_subscriptionHandle = bmqt::SubscriptionHandle(
subscriptionId,
msgImpl.d_correlationId);

msgImpl.d_schema_sp = context.d_schema_sp;
}

return true; // RETURN
Expand Down
4 changes: 2 additions & 2 deletions src/groups/bmq/bmqa/bmqa_mocksession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ Event MockSessionUtil::createAckEvent(const bsl::vector<AckParams>& acks,
implPtr->configureAsMessageEvent(
bmqp::Event(ackBuilder.blob().get(), alloc, true));
for (size_t i = 0; i != acks.size(); ++i) {
implPtr->addCorrelationId(acks[i].d_correlationId);
implPtr->addContext(acks[i].d_correlationId);
}

return event;
Expand Down Expand Up @@ -368,7 +368,7 @@ Event MockSessionUtil::createPushEvent(
logic);
implPtr->insertQueue(bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID,
queueImplPtr);
implPtr->addCorrelationId(bmqt::CorrelationId());
implPtr->addContext(bmqt::CorrelationId());
}

bmqp::Event bmqpEvent(pushBuilder.blob().get(), alloc, true);
Expand Down
20 changes: 3 additions & 17 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3657,21 +3657,7 @@ void BrokerSession::processPushEvent(const bmqp::Event& event)
sIds.begin();
citer != sIds.end();
++citer) {
bmqt::CorrelationId correlationId;
unsigned int subscriptionHandleId;
const QueueManager::QueueSp queue =
d_queueManager.observePushEvent(&correlationId,
&subscriptionHandleId,
*citer);

BSLS_ASSERT(queue);
queueEvent->insertQueue(citer->d_subscriptionId, queue);

// Use 'subscriptionHandle' instead of the internal
// 'citer->d_subscriptionId' so that
// 'bmqimp::Event::subscriptionId()' returns 'subscriptionHandle'

queueEvent->addCorrelationId(correlationId, subscriptionHandleId);
d_queueManager.observePushEvent(queueEvent.get(), *citer);
}

// Update event bytes
Expand Down Expand Up @@ -3768,7 +3754,7 @@ void BrokerSession::processAckEvent(const bmqp::Event& event)
}

// Keep track of user-provided CorrelationId (it may be unset)
queueEvent->addCorrelationId(correlationId);
queueEvent->addContext(correlationId);

// Insert queue into event
queueEvent->insertQueue(queue);
Expand Down Expand Up @@ -4674,7 +4660,7 @@ bool BrokerSession::cancelPendingMessageImp(
}

// Keep track of user-provided CorrelationId (it may be unset)
(*ackEvent)->addCorrelationId(qac.d_correlationId);
(*ackEvent)->addContext(qac.d_correlationId);

// Insert queue into event
bsl::shared_ptr<Queue> queue = queueSp;
Expand Down
12 changes: 6 additions & 6 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6835,7 +6835,7 @@ static void test33_queueNackTest()
BMQTST_ASSERT_EQ(pQueue->id(), iter->message().queueId());
BMQTST_ASSERT_EQ(k_ACK_STATUS_UNKNOWN, iter->message().status());
BMQTST_ASSERT_EQ(1, nackEvent->numCorrrelationIds());
BMQTST_ASSERT_EQ(corrId, nackEvent->correlationId(0));
BMQTST_ASSERT_EQ(corrId, nackEvent->context(0).d_correlationId);
BMQTST_ASSERT_EQ(0, iter->next());

PVV_SAFE("Step 9. Waiting QUEUE_CLOSE_RESULT event...");
Expand Down Expand Up @@ -9252,7 +9252,7 @@ static void test50_putRetransmittingTest()
BMQTST_ASSERT_EQ(k_ACK_STATUS_SUCCESS, ackIter->message().status());
BMQTST_ASSERT_EQ(guidFirst, ackIter->message().messageGUID());
BMQTST_ASSERT_EQ(1, ackEvent->numCorrrelationIds());
BMQTST_ASSERT_EQ(corrIdFirst, ackEvent->correlationId(0));
BMQTST_ASSERT_EQ(corrIdFirst, ackEvent->context(0).d_correlationId);
BMQTST_ASSERT_EQ(0, ackIter->next());

PVV_SAFE("Step 5. Trigger channel drop and verify no NACK event is sent");
Expand Down Expand Up @@ -9324,13 +9324,13 @@ static void test50_putRetransmittingTest()
BMQTST_ASSERT_EQ(guidSecond, iter->message().messageGUID());
BMQTST_ASSERT_EQ(k_ACK_STATUS_UNKNOWN, iter->message().status());
BMQTST_ASSERT_EQ(2, nackEvent->numCorrrelationIds());
BMQTST_ASSERT_EQ(corrIdSecond, nackEvent->correlationId(0));
BMQTST_ASSERT_EQ(corrIdSecond, nackEvent->context(0).d_correlationId);

BMQTST_ASSERT_EQ(1, iter->next());
BMQTST_ASSERT_EQ(pQueue->id(), iter->message().queueId());
BMQTST_ASSERT_EQ(guidThird, iter->message().messageGUID());
BMQTST_ASSERT_EQ(k_ACK_STATUS_UNKNOWN, iter->message().status());
BMQTST_ASSERT_EQ(corrIdThird, nackEvent->correlationId(1));
BMQTST_ASSERT_EQ(corrIdThird, nackEvent->context(1).d_correlationId);
BMQTST_ASSERT_EQ(0, iter->next());

PVV_SAFE("Step 11. Send one more PUT message while the channel is down");
Expand Down Expand Up @@ -9376,7 +9376,7 @@ static void test50_putRetransmittingTest()
BMQTST_ASSERT_EQ(guidFourth, iter->message().messageGUID());
BMQTST_ASSERT_EQ(k_ACK_STATUS_UNKNOWN, iter->message().status());
BMQTST_ASSERT_EQ(1, nackEvent->numCorrrelationIds());
BMQTST_ASSERT_EQ(corrIdFourth, nackEvent->correlationId(0));
BMQTST_ASSERT_EQ(corrIdFourth, nackEvent->context(0).d_correlationId);
BMQTST_ASSERT_EQ(0, iter->next());

PVV_SAFE("Step 14. Waiting QUEUE_CLOSE_RESULT event...");
Expand Down Expand Up @@ -9610,7 +9610,7 @@ static void test51_putRetransmittingNoAckTest()
BMQTST_ASSERT_EQ(guid2, iter->message().messageGUID());
BMQTST_ASSERT_EQ(k_ACK_STATUS_UNKNOWN, iter->message().status());
BMQTST_ASSERT_EQ(1, nackEvent->numCorrrelationIds());
BMQTST_ASSERT_EQ(corrId, nackEvent->correlationId(0));
BMQTST_ASSERT_EQ(corrId, nackEvent->context(0).d_correlationId);
BMQTST_ASSERT_EQ(0, iter->next());

PVV_SAFE("Step 13. Waiting QUEUE_CLOSE_RESULT event...");
Expand Down
12 changes: 6 additions & 6 deletions src/groups/bmq/bmqimp/bmqimp_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Event::Event(bdlbb::BlobBufferFactory* blobBufferFactory,
, d_putMsgIter(blobBufferFactory, allocator)
, d_putEventBuilderBuffer()
, d_isPutEventBuilderConstructed(false)
, d_correlationIds(allocator)
, d_contexts(allocator)

{
// NOTHING
Expand All @@ -83,7 +83,7 @@ Event::Event(const Event& other, bslma::Allocator* allocator)
, d_putMsgIter(other.d_bufferFactory_p, allocator)
, d_putEventBuilderBuffer()
, d_isPutEventBuilderConstructed(false)
, d_correlationIds(other.d_correlationIds, allocator)
, d_contexts(other.d_contexts, allocator)

{
// PRECONDITIONS
Expand Down Expand Up @@ -156,7 +156,7 @@ Event& Event::operator=(const Event& rhs)
d_correlationId = rhs.d_correlationId;
d_errorDescription = rhs.d_errorDescription;
d_msgEventMode = rhs.d_msgEventMode;
d_correlationIds = rhs.d_correlationIds;
d_contexts = rhs.d_contexts;

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(d_type ==
EventType::e_SESSION)) {
Expand Down Expand Up @@ -224,7 +224,7 @@ void Event::reset()
d_pushMsgIter.clear();
d_ackMsgIter.clear();
d_putMsgIter.clear();
d_correlationIds.clear();
d_contexts.clear();
}

void Event::clear()
Expand Down Expand Up @@ -356,7 +356,7 @@ Event& Event::upgradeMessageEventModeToWrite()
d_rawEvent.clear();
d_queues.clear();
d_queuesBySubscriptionId.clear();
d_correlationIds.clear();
d_contexts.clear();
d_correlationId.makeUnset();
return *this;
}
Expand Down Expand Up @@ -398,7 +398,7 @@ void Event::addMessageInfo(const bsl::shared_ptr<Queue>& queue,
// Add correlationId (even if it's empty) to the event's list. It is
// used by the message iterator to access correlationId by the message
// index.
addCorrelationId(corrId);
addContext(corrId);

// Insert correlationId and queueId and into correlationIds maps of
// correlationId container.
Expand Down
Loading
Loading