Skip to content

Commit ef97e79

Browse files
committed
Minor updates
1 parent 279422d commit ef97e79

File tree

2 files changed

+34
-11
lines changed

2 files changed

+34
-11
lines changed

src/delegate-mq/predef/databus/DataBus.h

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ namespace dmq {
3030
class DataBus {
3131
public:
3232
// Subscribe to a topic with optional QoS and thread dispatching.
33-
// NOTE: This triggers an immediate LVC callback if enabled. There is a small window
34-
// between LVC delivery and actual Signal connection where a publish might be missed.
33+
// NOTE: Signal connection is established before LVC delivery to ensure
34+
// no messages are missed.
3535
template <typename T, typename F>
3636
static dmq::ScopedConnection Subscribe(const std::string& topic, F&& func, dmq::IThread* thread = nullptr, QoS qos = {}) {
3737
return GetInstance().InternalSubscribe<T>(topic, std::forward<F>(func), thread, qos);
@@ -72,9 +72,14 @@ class DataBus {
7272
}
7373

7474
// Subscribe to all bus traffic (topic and stringified value).
75-
static dmq::ScopedConnection Monitor(std::function<void(const SpyPacket&)> func) {
75+
static dmq::ScopedConnection Monitor(std::function<void(const SpyPacket&)> func, dmq::IThread* thread = nullptr, dmq::Priority priority = dmq::Priority::NORMAL) {
7676
DataBus& instance = GetInstance();
7777
std::lock_guard<dmq::RecursiveMutex> lock(instance.m_mutex);
78+
if (thread) {
79+
auto del = dmq::MakeDelegate(std::move(func), *thread);
80+
del.SetPriority(priority);
81+
return instance.m_monitorSignal.Connect(del);
82+
}
7883
return instance.m_monitorSignal.Connect(dmq::MakeDelegate(std::move(func)));
7984
}
8085

@@ -159,13 +164,17 @@ class DataBus {
159164

160165
template <typename T>
161166
void InternalPublish(const std::string& topic, T data) {
167+
// Capture timestamp before lock acquisition for maximum accuracy and
168+
// monotonic ordering using dmq::Clock.
169+
auto now = dmq::Clock::now();
170+
uint64_t timestamp = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
171+
162172
SignalPtr<T> signal;
163173
std::shared_ptr<void> serializerPtr;
164174
dmq::ISerializer<void(T)>* serializer = nullptr;
165175
std::vector<std::shared_ptr<Participant>> participantsSnapshot;
166176
std::string strVal = "?";
167177
bool hasMonitor = false;
168-
uint64_t timestamp = 0;
169178

170179
{
171180
std::lock_guard<dmq::RecursiveMutex> lock(m_mutex);
@@ -186,9 +195,6 @@ class DataBus {
186195
auto func = static_cast<std::function<std::string(const T&)>*>(itStr->second.get());
187196
strVal = (*func)(data);
188197
}
189-
190-
auto now = std::chrono::system_clock::now();
191-
timestamp = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
192198
}
193199

194200
// 3. Get signal and remote info. Only create Signal if there is local interest.
@@ -242,6 +248,18 @@ class DataBus {
242248
template <typename T>
243249
void InternalRegisterStringifier(const std::string& topic, std::function<std::string(const T&)> func) {
244250
std::lock_guard<dmq::RecursiveMutex> lock(m_mutex);
251+
252+
// Runtime Type Safety: Ensure topic is not registered with multiple types
253+
auto itType = m_typeIndices.find(topic);
254+
if (itType != m_typeIndices.end()) {
255+
if (itType->second != std::type_index(typeid(T))) {
256+
::FaultHandler(__FILE__, (unsigned short)__LINE__);
257+
return;
258+
}
259+
} else {
260+
m_typeIndices.emplace(topic, std::type_index(typeid(T)));
261+
}
262+
245263
// Use shared_ptr with custom deleter to fix memory leak
246264
m_stringifiers[topic] = std::shared_ptr<void>(
247265
new std::function<std::string(const T&)>(std::move(func)),

src/delegate-mq/predef/databus/SpyPacket.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,20 @@ namespace dmq {
99
/// @brief Standardized packet containing bus traffic metadata.
1010
/// @details This struct is passed to DataBus::Monitor subscribers and is
1111
/// designed to be serialized for transmission to external diagnostic tools.
12+
///
13+
/// The timestamp_us field uses dmq::Clock::now(), which typically provides
14+
/// monotonic time since boot.
1215
struct SpyPacket {
1316
std::string topic; ///< The name of the data topic.
1417
std::string value; ///< Stringified representation of the data (or "?" if no stringifier registered).
15-
uint64_t timestamp_us; ///< Microseconds since epoch when the message was published.
18+
uint64_t timestamp_us; ///< Microseconds (usually since boot) when the message was published.
1619

20+
/// @brief Bitsery serialization method.
21+
/// @note Uses text2b for larger string support (up to 64KB).
1722
template <typename S>
18-
void serialize(S& s) {
19-
s.text1b(topic, 1024);
20-
s.text1b(value, 8192);
23+
void serialize(S& s) const {
24+
s.text2b(topic, 1024);
25+
s.text2b(value, 8192);
2126
s.value8b(timestamp_us);
2227
}
2328
};

0 commit comments

Comments
 (0)