Skip to content

Commit 76cf819

Browse files
committed
Use a pool of events to avoid the thundering herd problem.
1 parent 439b54b commit 76cf819

2 files changed

Lines changed: 50 additions & 11 deletions

File tree

catkit_core/LocalMessageBroker.cpp

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ LocalMessageBroker::LocalMessageBroker(
145145
MessageBrokerHeader *header,
146146
std::shared_ptr<HashMap> topic_headers,
147147
std::shared_ptr<PoolAllocator> message_header_allocator,
148-
std::shared_ptr<Event> event,
148+
std::array<std::shared_ptr<Event>, EVENT_POOL_SIZE> event_pool,
149149
std::vector<std::shared_ptr<HybridPoolAllocator>> allocators,
150150
std::vector<std::shared_ptr<Memory>> memory_blocks,
151151
std::shared_ptr<Memory> header_memory
@@ -154,7 +154,7 @@ LocalMessageBroker::LocalMessageBroker(
154154
m_Header(header),
155155
m_TopicHeaders(std::move(topic_headers)),
156156
m_MessageHeaderAllocator(std::move(message_header_allocator)),
157-
m_Event(std::move(event)),
157+
m_EventPool(std::move(event_pool)),
158158
m_Allocators(std::move(allocators)),
159159
m_MemoryBlocks(memory_blocks),
160160
m_MessageHeaders(header->message_headers)
@@ -184,8 +184,14 @@ std::shared_ptr<LocalMessageBroker> LocalMessageBroker::Create(StructStream &str
184184

185185
auto message_header_allocator = PoolAllocator::Create(stream, MAX_NUM_MESSAGES);
186186

187-
std::string id = "catkit2_message_broker_" + std::to_string(header->creator_pid) + "_" + std::to_string(header->time_of_creation);
188-
auto event = Event::Create(stream, id);
187+
// Create event pool for topic-based notification
188+
std::array<std::shared_ptr<Event>, EVENT_POOL_SIZE> event_pool;
189+
std::string base_id = "catkit2_message_broker_" + std::to_string(header->creator_pid) + "_" + std::to_string(header->time_of_creation);
190+
for (size_t i = 0; i < EVENT_POOL_SIZE; ++i)
191+
{
192+
std::string event_id = base_id + "_" + std::to_string(i);
193+
event_pool[i] = Event::Create(stream, event_id);
194+
}
189195

190196
DEBUG_PRINT("Extracting allocators.");
191197

@@ -210,7 +216,7 @@ std::shared_ptr<LocalMessageBroker> LocalMessageBroker::Create(StructStream &str
210216
header,
211217
std::move(topic_headers),
212218
std::move(message_header_allocator),
213-
std::move(event),
219+
std::move(event_pool),
214220
allocators,
215221
memory_blocks,
216222
stream.GetBuffer()
@@ -225,7 +231,13 @@ std::shared_ptr<LocalMessageBroker> LocalMessageBroker::Open(StructStream &strea
225231

226232
auto topic_headers = HashMap::Open(stream);
227233
auto message_header_allocator = PoolAllocator::Open(stream);
228-
auto event = Event::Open(stream);
234+
235+
// Open event pool
236+
std::array<std::shared_ptr<Event>, EVENT_POOL_SIZE> event_pool;
237+
for (size_t i = 0; i < EVENT_POOL_SIZE; ++i)
238+
{
239+
event_pool[i] = Event::Open(stream);
240+
}
229241

230242
std::vector<std::shared_ptr<HybridPoolAllocator>> allocators;
231243
std::vector<std::shared_ptr<Memory>> memory_blocks;
@@ -253,7 +265,7 @@ std::shared_ptr<LocalMessageBroker> LocalMessageBroker::Open(StructStream &strea
253265
header,
254266
std::move(topic_headers),
255267
std::move(message_header_allocator),
256-
std::move(event),
268+
std::move(event_pool),
257269
allocators,
258270
memory_blocks,
259271
stream.GetBuffer()
@@ -362,13 +374,25 @@ Message LocalMessageBroker::PublishMessage(Message message, bool is_final)
362374
auto topic = std::string_view(message.m_Header->topic);
363375
auto allocator = GetAllocator(message.m_Header->payload_info.memory_block_id);
364376

377+
// Track which event indices need to be signaled (to avoid duplicates)
378+
std::array<bool, EVENT_POOL_SIZE> signaled_events{};
379+
signaled_events.fill(false);
380+
std::size_t num_signaled = 0;
381+
365382
// Publish the message to all subtopics.
366383
for (const auto &subtopic : SubtopicRange(topic))
367384
{
368385
auto topic_header = GetTopicHeader(subtopic);
369386

370387
DEBUG_PRINT("Publishing to subtopic \"" << subtopic << "\".");
371388

389+
// Track this subtopic's event for signaling
390+
if (!signaled_events[topic_header->event_index])
391+
{
392+
signaled_events[topic_header->event_index] = true;
393+
num_signaled++;
394+
}
395+
372396
std::uint64_t first_id = topic_header->first_frame_id.load(std::memory_order_relaxed);
373397

374398
std::uint64_t frame_id;
@@ -448,7 +472,15 @@ Message LocalMessageBroker::PublishMessage(Message message, bool is_final)
448472
break;
449473
}
450474

451-
m_Event->Signal();
475+
// Signal all unique events for the subtopics
476+
for (std::size_t i = 0; i < EVENT_POOL_SIZE && num_signaled > 0; ++i)
477+
{
478+
if (signaled_events[i])
479+
{
480+
m_EventPool[i]->Signal();
481+
num_signaled--;
482+
}
483+
}
452484

453485
// Deallocate the message header and payload.
454486
PoolAllocator::BlockHandle message_header_index = message.m_Header - m_MessageHeaders;
@@ -556,8 +588,8 @@ std::optional<Message> LocalMessageBroker::GetNextMessage(std::string_view topic
556588
if (topic_header->IsMessageAvailable(new_frame_id))
557589
return FetchMessage(topic_header, new_frame_id);
558590

559-
// Otherwise, wait for it.
560-
m_Event->Wait(timeout_in_seconds, [topic_header, new_frame_id]() { return topic_header->last_frame_id > new_frame_id; }, wait_type, error_check);
591+
// Otherwise, wait for it on the specific event for this topic.
592+
m_EventPool[topic_header->event_index]->Wait(timeout_in_seconds, [topic_header, new_frame_id]() { return topic_header->last_frame_id > new_frame_id; }, wait_type, error_check);
561593

562594
return FetchMessage(topic_header, new_frame_id);
563595
}
@@ -670,6 +702,7 @@ TopicHeader *LocalMessageBroker::GetTopicHeader(std::string_view topic)
670702
temp_topic_header.first_frame_id = 0;
671703
temp_topic_header.last_frame_id = 0;
672704
temp_topic_header.frame_rate = 0.0;
705+
temp_topic_header.event_index = murmurhash3(topic) % EVENT_POOL_SIZE;
673706

674707
topic_header = (TopicHeader *) m_TopicHeaders->Insert(topic, &temp_topic_header);
675708

catkit_core/LocalMessageBroker.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const size_t MAX_NUM_MESSAGES = 65536;
2424
const size_t MAX_NUM_BLOCKS = 8192;
2525
const size_t MEMORY_ALIGNMENT = 32;
2626
const size_t MIN_SIZE_POOL = 1024;
27+
const size_t EVENT_POOL_SIZE = 128;
2728

2829
struct TopicHeader
2930
{
@@ -35,6 +36,9 @@ struct TopicHeader
3536

3637
std::array<std::uint64_t, TOPIC_MAX_NUM_MESSAGES> message_headers;
3738

39+
// Pre-computed event index for this topic (hash of topic name % EVENT_POOL_SIZE)
40+
std::uint32_t event_index;
41+
3842
bool IsMessageAvailable(std::size_t frame_id);
3943
bool WillMessageBeAvailable(std::size_t frame_id);
4044
std::size_t GetOldestMessageId();
@@ -65,7 +69,7 @@ class LocalMessageBroker : public Shareable, public MessageBroker
6569
MessageBrokerHeader *header,
6670
std::shared_ptr<HashMap> topic_headers,
6771
std::shared_ptr<PoolAllocator> message_header_allocator,
68-
std::shared_ptr<Event> event,
72+
std::array<std::shared_ptr<Event>, EVENT_POOL_SIZE> event_pool,
6973
std::vector<std::shared_ptr<HybridPoolAllocator>> allocators,
7074
std::vector<std::shared_ptr<Memory>> memory_blocks,
7175
std::shared_ptr<Memory> header_memory
@@ -135,6 +139,8 @@ class LocalMessageBroker : public Shareable, public MessageBroker
135139
std::shared_ptr<PoolAllocator> m_MessageHeaderAllocator;
136140
MessageHeader *m_MessageHeaders;
137141

142+
std::array<std::shared_ptr<Event>, EVENT_POOL_SIZE> m_EventPool;
143+
138144
std::vector<std::shared_ptr<HybridPoolAllocator>> m_Allocators;
139145
std::vector<std::shared_ptr<Memory>> m_MemoryBlocks;
140146
};

0 commit comments

Comments
 (0)