Skip to content

Commit 1621b51

Browse files
committed
Perf[MQB]: separate BlobSpPools and BufferFactories per FileStore thread
Signed-off-by: Evgeny Malygin <[email protected]>
1 parent bff69f4 commit 1621b51

File tree

5 files changed

+65
-37
lines changed

5 files changed

+65
-37
lines changed

Diff for: src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -867,6 +867,11 @@ struct TestHelper {
867867
const mqbcfg::PartitionConfig& partitionCfg =
868868
d_cluster_mp->_clusterDefinition().partitionConfig();
869869

870+
bmqp::BlobPoolUtil::BlobSpPoolSp blobSpPool_sp =
871+
bmqp::BlobPoolUtil::createBlobPool(
872+
&d_cluster_mp->_clusterData()->bufferFactory(),
873+
bmqtst::TestHelperUtil::allocator());
874+
870875
mqbs::DataStoreConfig dsCfg;
871876
dsCfg.setScheduler(&d_cluster_mp->_scheduler())
872877
.setBufferFactory(&d_cluster_mp->_clusterData()->bufferFactory())
@@ -894,7 +899,7 @@ struct TestHelper {
894899
d_cluster_mp->dispatcher(),
895900
&d_cluster_mp->netCluster(),
896901
&d_cluster_mp->_clusterData()->stats(),
897-
&d_cluster_mp->_clusterData()->blobSpPool(),
902+
blobSpPool_sp,
898903
&d_cluster_mp->_clusterData()->stateSpPool(),
899904
&threadPool,
900905
d_cluster_mp->isCSLModeEnabled(),

Diff for: src/groups/mqb/mqbc/mqbc_storageutil.cpp

+44-22
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include <bdlb_print.h>
4343
#include <bdlb_scopeexit.h>
4444
#include <bdlb_stringrefutil.h>
45+
#include <bdlbb_pooledblobbufferfactory.h>
4546
#include <bdlf_bind.h>
4647
#include <bdlf_memfn.h>
4748
#include <bdlma_localsequentialallocator.h>
@@ -1225,8 +1226,37 @@ int StorageUtil::assignPartitionDispatcherThreads(
12251226
mqbi::DispatcherClientType::e_QUEUE);
12261227
BSLS_ASSERT_SAFE(numProcessors > 0);
12271228

1229+
bslma::Allocator* baseAllocator = allocators->baseAllocator();
1230+
bsl::vector<bslma::Allocator*> queueThreadAllocators(numProcessors,
1231+
baseAllocator);
1232+
bsl::vector<bsl::shared_ptr<bdlbb::BlobBufferFactory> > bufferFactories(
1233+
numProcessors,
1234+
baseAllocator);
1235+
bsl::vector<bmqp::BlobPoolUtil::BlobSpPoolSp> blobSpPools(numProcessors,
1236+
baseAllocator);
1237+
1238+
// Allocater per-thread resources
1239+
for (int i = 0; i < numProcessors; ++i) {
1240+
bmqu::MemOutStream allocatorName(baseAllocator);
1241+
allocatorName << "QueueDispatcherThread" << i;
1242+
1243+
bslma::Allocator* queueThreadAllocator = allocators->get(
1244+
allocatorName.str());
1245+
bsl::shared_ptr<bdlbb::PooledBlobBufferFactory> bufferFactory =
1246+
bsl::allocate_shared<bdlbb::PooledBlobBufferFactory>(
1247+
queueThreadAllocator,
1248+
4096,
1249+
bsls::BlockGrowth::BSLS_CONSTANT);
1250+
1251+
queueThreadAllocators.at(i) = queueThreadAllocator;
1252+
bufferFactories.at(i) = bufferFactory;
1253+
blobSpPools.at(
1254+
i) = bmqp::BlobPoolUtil::createBlobPool(bufferFactory.get(),
1255+
queueThreadAllocator);
1256+
}
1257+
12281258
for (int i = 0; i < config.numPartitions(); ++i) {
1229-
int processorId = i % numProcessors;
1259+
const int processorId = i % numProcessors;
12301260
mqbs::DataStoreConfig dsCfg;
12311261
dsCfg.setScheduler(&clusterData->scheduler())
12321262
.setBufferFactory(&clusterData->bufferFactory())
@@ -1250,27 +1280,19 @@ int StorageUtil::assignPartitionDispatcherThreads(
12501280
dsCfg.setQueueDeletionCb(queueDeletionCb.value());
12511281
}
12521282

1253-
// Get named allocator from associated bmqma::CountingAllocatorStore
1254-
bslma::Allocator* fileStoreAllocator = allocators->get(
1255-
bsl::string("Partition") + bsl::to_string(i));
1256-
1257-
bsl::shared_ptr<mqbs::FileStore> fsSp(
1258-
new (*fileStoreAllocator)
1259-
mqbs::FileStore(dsCfg,
1260-
processorId,
1261-
dispatcher,
1262-
clusterData->membership().netCluster(),
1263-
&clusterData->stats(),
1264-
blobSpPool,
1265-
&clusterData->stateSpPool(),
1266-
threadPool,
1267-
cluster.isCSLModeEnabled(),
1268-
cluster.isFSMWorkflow(),
1269-
replicationFactor,
1270-
fileStoreAllocator),
1271-
fileStoreAllocator);
1272-
1273-
(*fileStores)[i] = fsSp;
1283+
(*fileStores)[i] = bsl::allocate_shared<mqbs::FileStore>(
1284+
queueThreadAllocators.at(processorId),
1285+
dsCfg,
1286+
processorId,
1287+
dispatcher,
1288+
clusterData->membership().netCluster(),
1289+
&clusterData->stats(),
1290+
blobSpPools.at(processorId),
1291+
&clusterData->stateSpPool(),
1292+
threadPool,
1293+
cluster.isCSLModeEnabled(),
1294+
cluster.isFSMWorkflow(),
1295+
replicationFactor);
12741296
}
12751297

12761298
return rc_SUCCESS;

Diff for: src/groups/mqb/mqbs/mqbs_filestore.cpp

+11-10
Original file line numberDiff line numberDiff line change
@@ -3947,7 +3947,8 @@ void FileStore::processReceiptEvent(unsigned int primaryLeaseId,
39473947
if (itNode == d_nodes.end()) {
39483948
// no prior history about this node
39493949
d_nodes.insert(
3950-
bsl::make_pair(nodeId, NodeContext(d_blobSpPool_p, recordKey)));
3950+
bsl::make_pair(nodeId,
3951+
NodeContext(d_blobSpPool_sp.get(), recordKey)));
39513952
from = d_unreceipted.begin();
39523953
}
39533954
else if (itNode->second.d_key < recordKey) {
@@ -5207,7 +5208,7 @@ void FileStore::aliasMessage(bsl::shared_ptr<bdlbb::Blob>* appData,
52075208

52085209
bdlbb::BlobBuffer optionsBlobBuffer(optionsBufferSp, optionsSize);
52095210

5210-
*options = d_blobSpPool_p->getObject();
5211+
*options = d_blobSpPool_sp->getObject();
52115212
(*options)->appendDataBuffer(optionsBlobBuffer);
52125213
}
52135214

@@ -5218,7 +5219,7 @@ void FileStore::aliasMessage(bsl::shared_ptr<bdlbb::Blob>* appData,
52185219
bdlbb::BlobBuffer appDataBlobBuffer(appDataBufferSp,
52195220
record.d_appDataUnpaddedLen);
52205221

5221-
*appData = d_blobSpPool_p->getObject();
5222+
*appData = d_blobSpPool_sp->getObject();
52225223
(*appData)->appendDataBuffer(appDataBlobBuffer);
52235224
}
52245225

@@ -5238,7 +5239,7 @@ FileStore::FileStore(const DataStoreConfig& config,
52385239
mqbi::Dispatcher* dispatcher,
52395240
mqbnet::Cluster* cluster,
52405241
mqbstat::ClusterStats* clusterStats,
5241-
BlobSpPool* blobSpPool,
5242+
BlobSpPoolSp blobSpPool_sp,
52425243
StateSpPool* statePool,
52435244
bdlmt::FixedThreadPool* miscWorkThreadPool,
52445245
bool isCSLModeEnabled,
@@ -5252,7 +5253,7 @@ FileStore::FileStore(const DataStoreConfig& config,
52525253
, d_partitionDescription(allocator)
52535254
, d_dispatcherClientData()
52545255
, d_clusterStats_p(clusterStats)
5255-
, d_blobSpPool_p(blobSpPool)
5256+
, d_blobSpPool_sp(blobSpPool_sp)
52565257
, d_statePool_p(statePool)
52575258
, d_aliasedBufferDeleterSpPool(1024, d_allocators.get("AliasedBufferDeleters"))
52585259
, d_isOpen(false)
@@ -5282,7 +5283,7 @@ FileStore::FileStore(const DataStoreConfig& config,
52825283
, d_nagglePacketCount(k_NAGLE_PACKET_COUNT)
52835284
, d_storageEventBuilder(FileStoreProtocol::k_VERSION,
52845285
bmqp::EventType::e_STORAGE,
5285-
d_blobSpPool_p,
5286+
d_blobSpPool_sp.get(),
52865287
allocator)
52875288
{
52885289
// PRECONDITIONS
@@ -6578,9 +6579,9 @@ FileStore::generateReceipt(NodeContext* nodeContext,
65786579
if (itNode == d_nodes.end()) {
65796580
// no prior history about this node
65806581
itNode = d_nodes
6581-
.insert(
6582-
bsl::make_pair(nodeId,
6583-
NodeContext(d_blobSpPool_p, key)))
6582+
.insert(bsl::make_pair(
6583+
nodeId,
6584+
NodeContext(d_blobSpPool_sp.get(), key)))
65846585
.first;
65856586
}
65866587
nodeContext = &itNode->second;
@@ -6610,7 +6611,7 @@ FileStore::generateReceipt(NodeContext* nodeContext,
66106611
// The pointer `nodeContext->d_blob_sp` might be in a write queue, so
66116612
// it's not safe to modify or replace the blob under this pointer.
66126613
// Instead, we get another shared pointer to another blob.
6613-
nodeContext->d_blob_sp = d_blobSpPool_p->getObject();
6614+
nodeContext->d_blob_sp = d_blobSpPool_sp->getObject();
66146615
bmqp::ProtocolUtil::buildReceipt(nodeContext->d_blob_sp.get(),
66156616
d_config.partitionId(),
66166617
primaryLeaseId,

Diff for: src/groups/mqb/mqbs/mqbs_filestore.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ class FileStore BSLS_KEYWORD_FINAL : public DataStore {
141141

142142
public:
143143
// TYPES
144-
145144
typedef bmqp::BlobPoolUtil::BlobSpPool BlobSpPool;
145+
typedef bmqp::BlobPoolUtil::BlobSpPoolSp BlobSpPoolSp;
146146

147147
/// Pool of shared pointers to AtomicStates
148148
typedef bdlcc::SharedObjectPool<
@@ -290,7 +290,7 @@ class FileStore BSLS_KEYWORD_FINAL : public DataStore {
290290
// used to report partition level
291291
// metrics.
292292

293-
mutable BlobSpPool* d_blobSpPool_p;
293+
mutable BlobSpPoolSp d_blobSpPool_sp;
294294
// Pool of shared pointers to blobs to
295295
// use.
296296

@@ -697,7 +697,7 @@ class FileStore BSLS_KEYWORD_FINAL : public DataStore {
697697
mqbi::Dispatcher* dispatcher,
698698
mqbnet::Cluster* cluster,
699699
mqbstat::ClusterStats* clusterStats,
700-
BlobSpPool* blobSpPool,
700+
BlobSpPoolSp blobSpPool,
701701
StateSpPool* statePool,
702702
bdlmt::FixedThreadPool* miscWorkThreadPool,
703703
bool isCSLModeEnabled,

Diff for: src/groups/mqb/mqbs/mqbs_filestore.t.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ struct Tester {
255255
&d_dispatcher,
256256
d_cluster_mp.get(),
257257
&d_clusterStats,
258-
d_blobSpPool_sp.get(),
258+
d_blobSpPool_sp,
259259
&d_statePool,
260260
&d_miscWorkThreadPool,
261261
false, // isCSLModeEnabled

0 commit comments

Comments
 (0)