Skip to content

Commit 91d01b5

Browse files
committed
Fixing thread-unsafe SchemaLeaner use
Signed-off-by: dorjesinpo <[email protected]>
1 parent 2bda5bd commit 91d01b5

21 files changed

+183
-149
lines changed

Diff for: src/groups/bmq/bmqa/bmqa_message.cpp

+12-14
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
#include <bmqimp_queue.h>
2525
#include <bmqp_event.h>
2626
#include <bmqp_eventutil.h>
27-
#include <bmqp_messageproperties.h>
2827
#include <bmqp_protocolutil.h>
2928
#include <bmqp_queueid.h>
3029
#include <bmqt_resultcode.h>
@@ -497,10 +496,11 @@ int Message::loadProperties(MessageProperties* buffer) const
497496
BSLS_ASSERT_SAFE(isInitialized());
498497
BSLS_ASSERT_SAFE(buffer);
499498

500-
bmqp::MessageProperties** propertiesImpl =
501-
reinterpret_cast<bmqp::MessageProperties**>(buffer);
499+
bmqp::MessageProperties* propertiesImpl =
500+
*reinterpret_cast<bmqp::MessageProperties**>(buffer);
502501

503-
const bmqp::Event& rawEvent = d_impl.d_event_p->rawEvent();
502+
bmqimp::Event* event = d_impl.d_event_p;
503+
const bmqp::Event& rawEvent = event->rawEvent();
504504
int rc = -1;
505505

506506
if (rawEvent.isPushEvent()) {
@@ -509,32 +509,30 @@ int Message::loadProperties(MessageProperties* buffer) const
509509
d_impl.d_queueId);
510510

511511
if (queue->id() == bmqimp::Queue::k_INVALID_QUEUE_ID) {
512-
queue = d_impl.d_event_p->lookupQueue();
512+
queue = event->lookupQueue();
513513
}
514514
BSLS_ASSERT_SAFE(queue);
515515

516516
bdlbb::Blob propertiesBlob;
517-
const bmqp::PushMessageIterator& it =
518-
*d_impl.d_event_p->pushMessageIterator();
517+
const bmqp::PushMessageIterator& it = *event->pushMessageIterator();
519518
it.loadMessageProperties(&propertiesBlob);
520519

521520
bmqp::MessagePropertiesInfo input(it.header());
522521

523-
rc = queue->schemaLearner().read(queue->schemaLearnerContext(),
524-
*propertiesImpl,
525-
input,
526-
propertiesBlob);
522+
rc = propertiesImpl->streamIn(propertiesBlob,
523+
input,
524+
d_impl.d_schema_sp);
527525

528526
// Forcibly load all properties.
529527
// REVISIT; this can be removed once MPs support dynamic modification
530528
if (rc == 0) {
531529
rc = 1000 *
532-
(*propertiesImpl)->loadProperties(false, input.isExtended());
530+
propertiesImpl->loadProperties(false, input.isExtended());
533531
}
534532
}
535533
else if (rawEvent.isPutEvent()) {
536-
rc = d_impl.d_event_p->putMessageIterator()->loadMessageProperties(
537-
*propertiesImpl);
534+
rc = event->putMessageIterator()->loadMessageProperties(
535+
propertiesImpl);
538536
// Schema learning for PUSHs only, not for PUTs assuming 'Message'
539537
// builds PUTs and receives PUSHs.
540538
}

Diff for: src/groups/bmq/bmqa/bmqa_message.h

+3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
// BMQ
6161

6262
#include <bmqa_queueid.h>
63+
#include <bmqp_messageproperties.h>
6364
#include <bmqt_compressionalgorithmtype.h>
6465
#include <bmqt_correlationid.h>
6566
#include <bmqt_messageguid.h>
@@ -117,6 +118,8 @@ struct MessageImpl {
117118
/// SubscriptionHandle this message is associated with
118119
bmqt::SubscriptionHandle d_subscriptionHandle;
119120

121+
bmqp::MessageProperties::SchemaPtr d_schema_sp;
122+
120123
#ifdef BMQ_ENABLE_MSG_GROUPID
121124
/// Optional GroupId this message is associated with
122125
bsl::string d_groupId;

Diff for: src/groups/bmq/bmqa/bmqa_message.t.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ static void test2_validPushMessagePrint()
226226
true);
227227
implPtr->configureAsMessageEvent(bmqpEvent);
228228

229-
implPtr->addCorrelationId(bmqt::CorrelationId());
229+
implPtr->addContext(bmqt::CorrelationId());
230230

231231
bmqa::MessageEvent pushMsgEvt = event.messageEvent();
232232
bmqa::MessageIterator mIter = pushMsgEvt.messageIterator();
@@ -329,7 +329,7 @@ static void test3_messageProperties()
329329
true);
330330

331331
implPtr->configureAsMessageEvent(bmqpEvent);
332-
implPtr->addCorrelationId(bmqt::CorrelationId());
332+
implPtr->addContext(bmqt::CorrelationId());
333333

334334
bmqa::MessageEvent pushMsgEvt = event.messageEvent();
335335
bmqa::MessageIterator mIter = pushMsgEvt.messageIterator();
@@ -493,7 +493,7 @@ static void test4_subscriptionHandle()
493493
implPtr->configureAsMessageEvent(bmqpEvent);
494494

495495
implPtr->insertQueue(sId, queueSp);
496-
implPtr->addCorrelationId(cId, sId);
496+
implPtr->addContext(cId, sId);
497497

498498
bmqa::MessageEvent pushMsgEvt = event.messageEvent();
499499
bmqa::MessageIterator mIter = pushMsgEvt.messageIterator();
@@ -547,7 +547,7 @@ static void test4_subscriptionHandle()
547547
implPtr->configureAsMessageEvent(bmqpEvent);
548548

549549
implPtr->insertQueue(defaultSubscriptionId, queueSp);
550-
implPtr->addCorrelationId(emptyCorrelationId, defaultSubscriptionId);
550+
implPtr->addContext(emptyCorrelationId, defaultSubscriptionId);
551551

552552
bmqa::MessageEvent pushMsgEvt = event.messageEvent();
553553
bmqa::MessageIterator mIter = pushMsgEvt.messageIterator();
@@ -592,7 +592,7 @@ static void test4_subscriptionHandle()
592592
implPtr->configureAsMessageEvent(bmqpEvent);
593593

594594
implPtr->insertQueue(queueSp);
595-
implPtr->addCorrelationId(cId);
595+
implPtr->addContext(cId);
596596

597597
bmqa::MessageEvent putMsgEvt = event.messageEvent();
598598
bmqa::MessageIterator mIter = putMsgEvt.messageIterator();
@@ -629,7 +629,7 @@ static void test4_subscriptionHandle()
629629
implPtr->configureAsMessageEvent(bmqpEvent);
630630

631631
implPtr->insertQueue(queueSp);
632-
implPtr->addCorrelationId(cId);
632+
implPtr->addContext(cId);
633633

634634
bmqa::MessageEvent ackMsgEvt = event.messageEvent();
635635
bmqa::MessageIterator mIter = ackMsgEvt.messageIterator();

Diff for: src/groups/bmq/bmqa/bmqa_messageevent.t.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ static void test2_ackMesageIteratorTest()
190190
for (bsl::vector<AckData>::const_iterator i = messages.begin();
191191
i != messages.end();
192192
++i) {
193-
eventImpl->addCorrelationId(bmqt::CorrelationId(i->d_corrId));
193+
eventImpl->addContext(bmqt::CorrelationId(i->d_corrId));
194194
}
195195

196196
bmqa::MessageEvent event;
@@ -266,7 +266,7 @@ static void test3_putMessageIteratorTest()
266266
// 'bmqa::MessageEventBuilder::packMessage' so that the message iterator
267267
// can access the correlationId for each message.
268268
for (size_t i = 0; i < k_NUM_MSGS; ++i) {
269-
eventImpl->addCorrelationId(bmqt::CorrelationId(i));
269+
eventImpl->addContext(bmqt::CorrelationId(i));
270270
}
271271

272272
bmqa::MessageEvent event;

Diff for: src/groups/bmq/bmqa/bmqa_messageiterator.cpp

+10-4
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ bool MessageIterator::nextMessage()
4848
msgImpl.d_queueId = bmqa::QueueId();
4949
msgImpl.d_correlationId.makeUnset();
5050
msgImpl.d_subscriptionHandle = bmqt::SubscriptionHandle();
51+
msgImpl.d_schema_sp.reset();
5152

5253
int rc = -1;
5354

@@ -100,15 +101,20 @@ bool MessageIterator::nextMessage()
100101
++d_impl.d_messageIndex;
101102
BSLS_ASSERT_SAFE(d_impl.d_messageIndex <
102103
d_impl.d_event_p->numCorrrelationIds());
103-
msgImpl.d_correlationId = d_impl.d_event_p->correlationId(
104-
d_impl.d_messageIndex);
104+
105+
const bmqimp::Event::MessageContext& context =
106+
d_impl.d_event_p->context(d_impl.d_messageIndex);
107+
108+
msgImpl.d_correlationId = context.d_correlationId;
105109

106110
if (d_impl.d_event_p->rawEvent().isPushEvent()) {
107-
const unsigned int subscriptionId =
108-
d_impl.d_event_p->subscriptionId(d_impl.d_messageIndex);
111+
const unsigned int subscriptionId = context.d_subscriptionHandleId;
112+
109113
msgImpl.d_subscriptionHandle = bmqt::SubscriptionHandle(
110114
subscriptionId,
111115
msgImpl.d_correlationId);
116+
117+
msgImpl.d_schema_sp = context.d_schema_sp;
112118
}
113119

114120
return true; // RETURN

Diff for: src/groups/bmq/bmqa/bmqa_mocksession.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ Event MockSessionUtil::createAckEvent(const bsl::vector<AckParams>& acks,
312312
implPtr->configureAsMessageEvent(
313313
bmqp::Event(ackBuilder.blob().get(), alloc, true));
314314
for (size_t i = 0; i != acks.size(); ++i) {
315-
implPtr->addCorrelationId(acks[i].d_correlationId);
315+
implPtr->addContext(acks[i].d_correlationId);
316316
}
317317

318318
return event;
@@ -368,7 +368,7 @@ Event MockSessionUtil::createPushEvent(
368368
logic);
369369
implPtr->insertQueue(bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID,
370370
queueImplPtr);
371-
implPtr->addCorrelationId(bmqt::CorrelationId());
371+
implPtr->addContext(bmqt::CorrelationId());
372372
}
373373

374374
bmqp::Event bmqpEvent(pushBuilder.blob().get(), alloc, true);

Diff for: src/groups/bmq/bmqimp/bmqimp_brokersession.cpp

+3-17
Original file line numberDiff line numberDiff line change
@@ -3657,21 +3657,7 @@ void BrokerSession::processPushEvent(const bmqp::Event& event)
36573657
sIds.begin();
36583658
citer != sIds.end();
36593659
++citer) {
3660-
bmqt::CorrelationId correlationId;
3661-
unsigned int subscriptionHandleId;
3662-
const QueueManager::QueueSp queue =
3663-
d_queueManager.observePushEvent(&correlationId,
3664-
&subscriptionHandleId,
3665-
*citer);
3666-
3667-
BSLS_ASSERT(queue);
3668-
queueEvent->insertQueue(citer->d_subscriptionId, queue);
3669-
3670-
// Use 'subscriptionHandle' instead of the internal
3671-
// 'citer->d_subscriptionId' so that
3672-
// 'bmqimp::Event::subscriptionId()' returns 'subscriptionHandle'
3673-
3674-
queueEvent->addCorrelationId(correlationId, subscriptionHandleId);
3660+
d_queueManager.observePushEvent(queueEvent.get(), *citer);
36753661
}
36763662

36773663
// Update event bytes
@@ -3768,7 +3754,7 @@ void BrokerSession::processAckEvent(const bmqp::Event& event)
37683754
}
37693755

37703756
// Keep track of user-provided CorrelationId (it may be unset)
3771-
queueEvent->addCorrelationId(correlationId);
3757+
queueEvent->addContext(correlationId);
37723758

37733759
// Insert queue into event
37743760
queueEvent->insertQueue(queue);
@@ -4674,7 +4660,7 @@ bool BrokerSession::cancelPendingMessageImp(
46744660
}
46754661

46764662
// Keep track of user-provided CorrelationId (it may be unset)
4677-
(*ackEvent)->addCorrelationId(qac.d_correlationId);
4663+
(*ackEvent)->addContext(qac.d_correlationId);
46784664

46794665
// Insert queue into event
46804666
bsl::shared_ptr<Queue> queue = queueSp;

Diff for: src/groups/bmq/bmqimp/bmqimp_brokersession.t.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -6835,7 +6835,7 @@ static void test33_queueNackTest()
68356835
BMQTST_ASSERT_EQ(pQueue->id(), iter->message().queueId());
68366836
BMQTST_ASSERT_EQ(k_ACK_STATUS_UNKNOWN, iter->message().status());
68376837
BMQTST_ASSERT_EQ(1, nackEvent->numCorrrelationIds());
6838-
BMQTST_ASSERT_EQ(corrId, nackEvent->correlationId(0));
6838+
BMQTST_ASSERT_EQ(corrId, nackEvent->context(0).d_correlationId);
68396839
BMQTST_ASSERT_EQ(0, iter->next());
68406840

68416841
PVV_SAFE("Step 9. Waiting QUEUE_CLOSE_RESULT event...");
@@ -9252,7 +9252,7 @@ static void test50_putRetransmittingTest()
92529252
BMQTST_ASSERT_EQ(k_ACK_STATUS_SUCCESS, ackIter->message().status());
92539253
BMQTST_ASSERT_EQ(guidFirst, ackIter->message().messageGUID());
92549254
BMQTST_ASSERT_EQ(1, ackEvent->numCorrrelationIds());
9255-
BMQTST_ASSERT_EQ(corrIdFirst, ackEvent->correlationId(0));
9255+
BMQTST_ASSERT_EQ(corrIdFirst, ackEvent->context(0).d_correlationId);
92569256
BMQTST_ASSERT_EQ(0, ackIter->next());
92579257

92589258
PVV_SAFE("Step 5. Trigger channel drop and verify no NACK event is sent");
@@ -9324,13 +9324,13 @@ static void test50_putRetransmittingTest()
93249324
BMQTST_ASSERT_EQ(guidSecond, iter->message().messageGUID());
93259325
BMQTST_ASSERT_EQ(k_ACK_STATUS_UNKNOWN, iter->message().status());
93269326
BMQTST_ASSERT_EQ(2, nackEvent->numCorrrelationIds());
9327-
BMQTST_ASSERT_EQ(corrIdSecond, nackEvent->correlationId(0));
9327+
BMQTST_ASSERT_EQ(corrIdSecond, nackEvent->context(0).d_correlationId);
93289328

93299329
BMQTST_ASSERT_EQ(1, iter->next());
93309330
BMQTST_ASSERT_EQ(pQueue->id(), iter->message().queueId());
93319331
BMQTST_ASSERT_EQ(guidThird, iter->message().messageGUID());
93329332
BMQTST_ASSERT_EQ(k_ACK_STATUS_UNKNOWN, iter->message().status());
9333-
BMQTST_ASSERT_EQ(corrIdThird, nackEvent->correlationId(1));
9333+
BMQTST_ASSERT_EQ(corrIdThird, nackEvent->context(1).d_correlationId);
93349334
BMQTST_ASSERT_EQ(0, iter->next());
93359335

93369336
PVV_SAFE("Step 11. Send one more PUT message while the channel is down");
@@ -9376,7 +9376,7 @@ static void test50_putRetransmittingTest()
93769376
BMQTST_ASSERT_EQ(guidFourth, iter->message().messageGUID());
93779377
BMQTST_ASSERT_EQ(k_ACK_STATUS_UNKNOWN, iter->message().status());
93789378
BMQTST_ASSERT_EQ(1, nackEvent->numCorrrelationIds());
9379-
BMQTST_ASSERT_EQ(corrIdFourth, nackEvent->correlationId(0));
9379+
BMQTST_ASSERT_EQ(corrIdFourth, nackEvent->context(0).d_correlationId);
93809380
BMQTST_ASSERT_EQ(0, iter->next());
93819381

93829382
PVV_SAFE("Step 14. Waiting QUEUE_CLOSE_RESULT event...");
@@ -9610,7 +9610,7 @@ static void test51_putRetransmittingNoAckTest()
96109610
BMQTST_ASSERT_EQ(guid2, iter->message().messageGUID());
96119611
BMQTST_ASSERT_EQ(k_ACK_STATUS_UNKNOWN, iter->message().status());
96129612
BMQTST_ASSERT_EQ(1, nackEvent->numCorrrelationIds());
9613-
BMQTST_ASSERT_EQ(corrId, nackEvent->correlationId(0));
9613+
BMQTST_ASSERT_EQ(corrId, nackEvent->context(0).d_correlationId);
96149614
BMQTST_ASSERT_EQ(0, iter->next());
96159615

96169616
PVV_SAFE("Step 13. Waiting QUEUE_CLOSE_RESULT event...");

Diff for: src/groups/bmq/bmqimp/bmqimp_event.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ Event::Event(bdlbb::BlobBufferFactory* blobBufferFactory,
5757
, d_putMsgIter(blobBufferFactory, allocator)
5858
, d_putEventBuilderBuffer()
5959
, d_isPutEventBuilderConstructed(false)
60-
, d_correlationIds(allocator)
60+
, d_contexts(allocator)
6161

6262
{
6363
// NOTHING
@@ -83,7 +83,7 @@ Event::Event(const Event& other, bslma::Allocator* allocator)
8383
, d_putMsgIter(other.d_bufferFactory_p, allocator)
8484
, d_putEventBuilderBuffer()
8585
, d_isPutEventBuilderConstructed(false)
86-
, d_correlationIds(other.d_correlationIds, allocator)
86+
, d_contexts(other.d_contexts, allocator)
8787

8888
{
8989
// PRECONDITIONS
@@ -156,7 +156,7 @@ Event& Event::operator=(const Event& rhs)
156156
d_correlationId = rhs.d_correlationId;
157157
d_errorDescription = rhs.d_errorDescription;
158158
d_msgEventMode = rhs.d_msgEventMode;
159-
d_correlationIds = rhs.d_correlationIds;
159+
d_contexts = rhs.d_contexts;
160160

161161
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(d_type ==
162162
EventType::e_SESSION)) {
@@ -224,7 +224,7 @@ void Event::reset()
224224
d_pushMsgIter.clear();
225225
d_ackMsgIter.clear();
226226
d_putMsgIter.clear();
227-
d_correlationIds.clear();
227+
d_contexts.clear();
228228
}
229229

230230
void Event::clear()
@@ -356,7 +356,7 @@ Event& Event::upgradeMessageEventModeToWrite()
356356
d_rawEvent.clear();
357357
d_queues.clear();
358358
d_queuesBySubscriptionId.clear();
359-
d_correlationIds.clear();
359+
d_contexts.clear();
360360
d_correlationId.makeUnset();
361361
return *this;
362362
}
@@ -398,7 +398,7 @@ void Event::addMessageInfo(const bsl::shared_ptr<Queue>& queue,
398398
// Add correlationId (even if it's empty) to the event's list. It is
399399
// used by the message iterator to access correlationId by the message
400400
// index.
401-
addCorrelationId(corrId);
401+
addContext(corrId);
402402

403403
// Insert correlationId and queueId and into correlationIds maps of
404404
// correlationId container.

0 commit comments

Comments
 (0)