-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathLocalMessageBroker.h
More file actions
128 lines (93 loc) · 4.06 KB
/
Copy pathLocalMessageBroker.h
File metadata and controls
128 lines (93 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#ifndef LOCAL_MESSAGE_BROKER_H
#define LOCAL_MESSAGE_BROKER_H
#include "HashMap.h"
#include "Event.h"
#include "HybridPoolAllocator.h"
#include "PoolAllocator.h"
#include "SharedMemory.h"
#include "LocalMemory.h"
#include "CudaSharedMemory.h"
#include "Uuid.h"
#include "ArrayView.h"
#include "MessageBroker.h"
#include <memory>
#include <array>
#include <optional>
const std::array<std::uint8_t, 4> MESSAGE_BROKER_VERSION = {0, 1, 0, 0};
const size_t TOPIC_HASH_MAP_SIZE = 16384;
const size_t TOPIC_MAX_NUM_MESSAGES = 32;
const size_t MAX_NUM_MESSAGES = 65536;
const size_t MAX_NUM_BLOCKS = 8192;
const size_t MEMORY_ALIGNMENT = 32;
const size_t MIN_SIZE_POOL = 1024;
struct TopicHeader
{
std::atomic_uint64_t next_frame_id;
std::atomic_uint64_t first_frame_id;
std::atomic_uint64_t last_frame_id;
double frame_rate;
std::array<std::uint64_t, TOPIC_MAX_NUM_MESSAGES> message_headers;
bool IsMessageAvailable(std::size_t frame_id);
bool WillMessageBeAvailable(std::size_t frame_id);
std::size_t GetOldestMessageId();
std::size_t GetNewestMessageId();
double GetMessageRate();
};
struct MessageBrokerHeader
{
char creator_hostname[HOST_NAME_SIZE];
std::uint64_t time_of_creation;
int creator_pid;
std::size_t num_memory_blocks;
std::uint64_t time_of_last_activity;
MessageHeader message_headers[MAX_NUM_MESSAGES];
};
class LocalMessageBroker : public Shareable, public MessageBroker
{
friend class MessageSubscription;
private:
LocalMessageBroker(
MessageBrokerHeader *header,
std::shared_ptr<HashMap> topic_headers,
std::shared_ptr<PoolAllocator> message_header_allocator,
std::shared_ptr<Event> event,
std::vector<std::shared_ptr<HybridPoolAllocator>> allocators,
std::vector<std::shared_ptr<Memory>> memory_blocks,
std::shared_ptr<Memory> header_memory
);
public:
static std::shared_ptr<LocalMessageBroker> Create(StructStream &stream, std::vector<std::shared_ptr<Memory>> memory_blocks);
static std::shared_ptr<LocalMessageBroker> Open(StructStream &stream);
static std::size_t CalculateBufferSize(); // TODO: Add parameters.
// Prepare a message for publishing with a trace ID.
virtual Message PrepareMessageImpl(std::string_view topic, size_t payload_size, Uuid trace_id, uint8_t memory_block_id = 0) override;
// Publish a message.
virtual Message PublishMessage(Message message, bool is_final = true) override;
// Get the newest message for a topic.
virtual std::optional<Message> GetCurrentMessage(std::string_view topic) override;
// Get the message rate for a topic.
virtual double GetMessageRate(std::string_view topic) override;
// Get the message topics for all messages in this broker.
virtual std::vector<std::string> GetAllMessageTopics() override;
ShareableType GetType() const override;
virtual void PrintDebugInfo() const override;
protected:
// Get the next message for a topic.
virtual std::optional<Message> GetNextMessage(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly, double timeout_in_seconds = -1, EventWaitMethod wait_type = EventWaitMethod::Default, void (*error_check)() = nullptr) override;
// Try to get the next message for a topic.
virtual std::optional<Message> TryGetNextMessage(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly) override;
private:
Message FetchMessage(TopicHeader *topic_header, size_t frame_id);
std::uint64_t GetNextMessageId(TopicHeader *topic_header, size_t preferred_next_frame_id, MessageSubscriptionMode mode);
std::shared_ptr<HybridPoolAllocator> GetAllocator(uint8_t memory_block_id);
std::shared_ptr<Memory> GetMemory(uint8_t memory_block_id);
TopicHeader *GetTopicHeader(std::string_view topic);
MessageBrokerHeader *m_Header;
std::shared_ptr<HashMap> m_TopicHeaders;
std::shared_ptr<Event> m_Event;
std::shared_ptr<PoolAllocator> m_MessageHeaderAllocator;
MessageHeader *m_MessageHeaders;
std::vector<std::shared_ptr<HybridPoolAllocator>> m_Allocators;
std::vector<std::shared_ptr<Memory>> m_MemoryBlocks;
};
#endif // LOCAL_MESSAGE_BROKER_H