Skip to content

Commit f0622b7

Browse files
committed
learn schema while iterating event
Signed-off-by: dorjesinpo <[email protected]>
1 parent 4daa9d2 commit f0622b7

6 files changed

+86
-42
lines changed

Diff for: src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp

+35-11
Original file line numberDiff line numberDiff line change
@@ -314,12 +314,7 @@ void QueueManager::observePushEvent(Event* queueEvent,
314314

315315
queueEvent->insertQueue(info.d_subscriptionId, queue);
316316

317-
bmqp::MessageProperties::SchemaPtr schema = schemaLearner().learn(
318-
schemaLearner().createContext(queueId),
319-
bmqp::MessagePropertiesInfo(info.d_header),
320-
bdlbb::Blob());
321-
322-
queueEvent->addContext(correlationId, subscriptionHandleId, schema);
317+
queueEvent->addContext(correlationId, subscriptionHandleId, info.d_schema);
323318
}
324319

325320
int QueueManager::onPushEvent(QueueManager::EventInfos* eventInfos,
@@ -367,6 +362,33 @@ int QueueManager::onPushEvent(QueueManager::EventInfos* eventInfos,
367362
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
368363
return rc * 10 + rc_OPTIONS_LOAD_ERROR; // RETURN
369364
}
365+
366+
const bmqp::PushHeader& header = msgIterator.header();
367+
int queueId = header.queueId();
368+
const bmqp::MessagePropertiesInfo input(header);
369+
370+
bmqp::MessageProperties::SchemaPtr* schemaHolder =
371+
schemaLearner().observe(d_schemaLearner.createContext(queueId),
372+
input);
373+
374+
bmqp::MessageProperties::SchemaPtr schema;
375+
376+
if (schemaHolder) {
377+
schema = *schemaHolder;
378+
if (!schema) {
379+
// Learn new Schema by reading all MessageProperties.
380+
bdlbb::Blob appData(d_allocator_p);
381+
if (msgIterator.loadApplicationData(&appData)) {
382+
bmqp::MessageProperties mps(d_allocator_p);
383+
384+
if (mps.streamIn(appData, input.isExtended()) == 0) {
385+
// Learn new schema.
386+
*schemaHolder = schema = mps.makeSchema(d_allocator_p);
387+
}
388+
}
389+
}
390+
}
391+
370392
eventInfos->resize(eventInfos->size() + 1);
371393
if ((optionsView.find(bmqp::OptionType::e_SUB_QUEUE_INFOS) !=
372394
optionsView.end()) ||
@@ -388,10 +410,11 @@ int QueueManager::onPushEvent(QueueManager::EventInfos* eventInfos,
388410
*hasMessageWithMultipleSubQueueIds = true;
389411
}
390412
else {
391-
eventInfos->back().d_ids.push_back(bmqp::EventUtilQueueInfo(
392-
subQueueInfos[0].id(),
393-
msgIterator.header(),
394-
msgIterator.applicationDataSize()));
413+
eventInfos->back().d_ids.push_back(
414+
bmqp::EventUtilQueueInfo(subQueueInfos[0].id(),
415+
msgIterator.header(),
416+
msgIterator.applicationDataSize(),
417+
schema));
395418
}
396419

397420
// Update message count
@@ -401,7 +424,8 @@ int QueueManager::onPushEvent(QueueManager::EventInfos* eventInfos,
401424
eventInfos->back().d_ids.push_back(bmqp::EventUtilQueueInfo(
402425
bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID,
403426
msgIterator.header(),
404-
msgIterator.applicationDataSize()));
427+
msgIterator.applicationDataSize(),
428+
schema));
405429

406430
// Update message count
407431
++(*messageCount);

Diff for: src/groups/bmq/bmqp/bmqp_eventutil.cpp

+33-3
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ class Flattener {
115115
OptionsView d_optionsView;
116116
// Helps go through Options
117117

118+
bmqp::SchemaLearner* d_schemaLearner;
119+
118120
// PRIVATE TYPES
119121
typedef bdlma::LocalSequentialAllocator<
120122
32 * sizeof(Protocol::SubQueueIdsArrayOld::value_type)>
@@ -187,7 +189,8 @@ class Flattener {
187189
const Event& event,
188190
bdlbb::BlobBufferFactory* bufferFactory,
189191
BlobSpPool* blobSpPool_p,
190-
bslma::Allocator* allocator);
192+
bslma::Allocator* allocator,
193+
bmqp::SchemaLearner* schemaLearner = 0);
191194

192195
// MANIPULATORS
193196

@@ -372,13 +375,38 @@ Flattener::packMesage(const Protocol::SubQueueInfosArray& subQInfo)
372375
return result; // RETURN
373376
}
374377

378+
int queueId = header.queueId();
379+
const bmqp::MessagePropertiesInfo input(header);
380+
381+
bmqp::MessageProperties::SchemaPtr schema;
382+
383+
if (d_schemaLearner) {
384+
bmqp::MessageProperties::SchemaPtr* schemaHolder =
385+
d_schemaLearner->observe(d_schemaLearner->createContext(queueId),
386+
input);
387+
388+
if (schemaHolder) {
389+
schema = *schemaHolder;
390+
if (!schema) {
391+
// Learn new Schema by reading all MessageProperties.
392+
bmqp::MessageProperties mps(d_allocator_p);
393+
394+
if (mps.streamIn(d_appData, input.isExtended()) == 0) {
395+
// Learn new schema.
396+
*schemaHolder = schema = mps.makeSchema(d_allocator_p);
397+
}
398+
}
399+
}
400+
}
401+
375402
// Successfully packed the current message and associated it with the
376403
// current SubQueueId being processed. Add the corresponding queueId for
377404
// this message associated with the current flattened event.
378405
d_currEventInfo.d_ids.push_back(
379406
bmqp::EventUtilQueueInfo(subQInfo[0].id(),
380407
header,
381-
d_msgIterator.applicationDataSize()));
408+
d_msgIterator.applicationDataSize(),
409+
schema));
382410

383411
return result;
384412
}
@@ -404,14 +432,16 @@ Flattener::Flattener(bsl::vector<EventUtilEventInfo>* eventInfos,
404432
const Event& event,
405433
bdlbb::BlobBufferFactory* bufferFactory,
406434
BlobSpPool* blobSpPool_p,
407-
bslma::Allocator* allocator)
435+
bslma::Allocator* allocator,
436+
bmqp::SchemaLearner* schemaLearner)
408437
: d_eventInfos_p(eventInfos)
409438
, d_allocator_p(allocator)
410439
, d_builder(blobSpPool_p, allocator)
411440
, d_msgIterator(bufferFactory, allocator)
412441
, d_currEventInfo(allocator)
413442
, d_appData(bufferFactory, allocator)
414443
, d_optionsView(allocator)
444+
, d_schemaLearner(schemaLearner)
415445
{
416446
event.loadPushMessageIterator(&d_msgIterator);
417447
BSLS_ASSERT_SAFE(d_msgIterator.isValid());

Diff for: src/groups/bmq/bmqp/bmqp_eventutil.h

+11-4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include <bmqp_blobpoolutil.h>
3434
#include <bmqp_protocol.h>
3535
#include <bmqp_queueid.h>
36+
#include <bmqp_schemalearner.h>
3637

3738
// BDE
3839
#include <bdlbb_blob.h>
@@ -65,9 +66,12 @@ struct EventUtilQueueInfo {
6566
const bmqp::PushHeader d_header;
6667
const int d_applicationDataSize;
6768

69+
const bmqp::MessageProperties::SchemaPtr d_schema;
70+
6871
EventUtilQueueInfo(unsigned int subscriptionId,
6972
const bmqp::PushHeader& header,
70-
int applicationDataSize);
73+
int applicationDataSize,
74+
const bmqp::MessageProperties::SchemaPtr schema);
7175
};
7276
// =========================
7377
// struct EventUtilEventInfo
@@ -146,12 +150,15 @@ struct EventUtil {
146150
// struct EventUtilQueueInfo
147151
// --------------------------
148152

149-
inline EventUtilQueueInfo::EventUtilQueueInfo(unsigned int subscriptionId,
150-
const bmqp::PushHeader& header,
151-
int appDataSize)
153+
inline EventUtilQueueInfo::EventUtilQueueInfo(
154+
unsigned int subscriptionId,
155+
const bmqp::PushHeader& header,
156+
int appDataSize,
157+
const bmqp::MessageProperties::SchemaPtr schema)
152158
: d_subscriptionId(subscriptionId)
153159
, d_header(header)
154160
, d_applicationDataSize(appDataSize)
161+
, d_schema(schema)
155162
{
156163
// NOTHING
157164
}

Diff for: src/groups/bmq/bmqp/bmqp_schemalearner.cpp

+4-18
Original file line numberDiff line numberDiff line change
@@ -138,16 +138,14 @@ SchemaLearner::Context SchemaLearner::createContext(int foreignId)
138138
return context->second;
139139
}
140140

141-
SchemaLearner::SchemaPtr
142-
SchemaLearner::learn(Context& context,
143-
const MessagePropertiesInfo& input,
144-
const bdlbb::Blob& blob)
141+
SchemaLearner::SchemaPtr*
142+
SchemaLearner::observe(Context& context, const MessagePropertiesInfo& input)
145143
{
146144
const SchemaIdType inputId = input.schemaId();
147145

148146
if (!isPresentAndValid(inputId)) {
149147
// Nothing to do
150-
return SchemaPtr(); // RETURN
148+
return 0; // RETURN
151149
}
152150

153151
BSLS_ASSERT_SAFE(context);
@@ -171,19 +169,7 @@ SchemaLearner::learn(Context& context,
171169
// Must destroy previously learned Schema
172170
}
173171

174-
const SchemaPtr& result = contextHandle->d_schema_sp;
175-
176-
if (!result) {
177-
// Learn new Schema by reading all MessageProperties.
178-
MessageProperties mps(d_allocator_p);
179-
int rc = mps.streamIn(blob, input, result);
180-
181-
if (rc == 0) {
182-
// Learn new schema.
183-
contextHandle->d_schema_sp = mps.makeSchema(d_allocator_p);
184-
}
185-
}
186-
return result;
172+
return &contextHandle->d_schema_sp;
187173
}
188174

189175
MessagePropertiesInfo

Diff for: src/groups/bmq/bmqp/bmqp_schemalearner.h

+2-5
Original file line numberDiff line numberDiff line change
@@ -215,11 +215,8 @@ class SchemaLearner {
215215

216216
/// If the specified `input` indicates recycling, reset previously learned
217217
/// schema accumulated in the specified `context` and associated with the
218-
/// id in `input`. Return the previously learned schema if it is valid,
219-
/// learn and return new schema otherwise.
220-
SchemaPtr learn(Context& context,
221-
const MessagePropertiesInfo& input,
222-
const bdlbb::Blob& blob);
218+
/// id in `input`. Return the address of the corresponding schema holder.
219+
SchemaPtr* observe(Context& context, const MessagePropertiesInfo& input);
223220

224221
/// Return `MessagePropertiesLogic` with a unique id associated with the
225222
/// specified `context` and the id in the specified `input`. If there

Diff for: src/groups/bmq/bmqp/bmqp_schemalearner.t.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ static void test3_observingTest()
209209

210210
bmqp::MessagePropertiesInfo recycledInput(true, 1, true);
211211

212-
theLearner.learn(server, recycledInput, bdlbb::Blob());
212+
theLearner.observe(server, recycledInput);
213213

214214
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out2, input, blob));
215215

0 commit comments

Comments
 (0)