Skip to content

Commit f870432

Browse files
committed
Minor documentation and code updates.
1 parent e44fc44 commit f870432

File tree

7 files changed

+410
-62
lines changed

7 files changed

+410
-62
lines changed

README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ DelegateMQ is a C++ header-only library for invoking any callable (e.g., functio
1414
- Asynchronously
1515
- Remotely across processes or processors
1616

17-
It unifies function calls across threads or systems via a simple delegate interface. DelegateMQ is thread-safe, unit-tested, and easy to port to any platform.
17+
It unifies function calls across threads or systems via a simple delegate interface. A powerful, lightweight messaging library for thread-safe asynchronous callbacks, non-blocking APIs, and passing data between threads. DelegateMQ is thread-safe, unit-tested, and easy to port to any platform.
1818

1919
# Key Concepts
2020

@@ -111,12 +111,15 @@ public:
111111
112112
private:
113113
// Handle publisher callback on m_thread
114-
void HandleMsgCb(const std::string& msg) { std::cout << msg << std::endl; }
114+
void HandleMsgCb(const std::string& msg)
115+
{
116+
std::cout << msg << std::endl;
117+
}
115118
Thread m_thread;
116119
};
117120
```
118121

119-
Multiple callables targets stored and invoked asynchronously.
122+
Multiple callables targets stored and broadcast invoked asynchronously.
120123

121124
```cpp
122125
// Create an async delegate targeting lambda on thread1

docs/DETAILS.md

Lines changed: 118 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ The DelegateMQ C++ library enables function invocations on any callable, either
4242
- [The Risk: Raw Pointers](#the-risk-raw-pointers)
4343
- [The Solution: Shared Pointers](#the-solution-shared-pointers)
4444
- [Register and Unregister](#register-and-unregister)
45-
- [The Init/Term Pattern](#the-initterm-pattern)
45+
- [Init/Term Pattern](#initterm-pattern)
46+
- [RAII Pattern](#raii-pattern)
47+
- [Object Lifetime Usage Guide](#object-lifetime-usage-guide)
4648
- [Library Dependencies](#library-dependencies)
4749
- [Fixed-Block Memory Allocator](#fixed-block-memory-allocator)
4850
- [Function Argument Copy](#function-argument-copy)
@@ -244,7 +246,7 @@ Numerous predefined platforms are already supported such as Windows, Linux, and
244246
3. **Implement `ISerializer` and `ITransport`**: Required to use **Remote** delegates across processes/processors.
245247
* *Optional:* Implement `ITransportMonitor` if your application layer requires command acknowledgments (ACKs).
246248
* See [Sample Projects](#sample-projects) for numerous remote delegate examples.
247-
4. **Check System Clock**: Ensure `std::chrono::steady_clock` is supported on your target hardware, as it is required for timers and transport timeouts.
249+
4. **Check System Clock**: Ensure `std::chrono::steady_clock` is supported on your target hardware, as it is required for timers and transport timeouts. Otherwise, change `dmw::Clock` in `DelegateOpt.h` to a new clock type.
248250
5. **Call `Timer::ProcessTimers()`**: Periodically call `ProcessTimers()` (e.g., from a main loop or hardware timer ISR) to support timers and thread watchdogs.
249251
6. **Configure Build Options**: Set CMake DMQ library build options within `CMakeLists.txt`.
250252
* Example: `DMQ_ASSERTS` for debug assertions.
@@ -954,9 +956,9 @@ safeObj.reset();
954956

955957
When using `std::shared_ptr` and `std::enable_shared_from_this`, you must follow a specific pattern to manually register and unregister delegates.
956958

957-
Critical Rule: You cannot call `shared_from_this()` inside a destructor. Doing so causes a std::bad_weak_ptr crash because the ownership of the object has already expired. Therefore, if you require manual unregistration (to stop receiving events immediately), you must use an explicit Init/Term pattern.
959+
Critical Rule: You cannot call `shared_from_this()` inside a destructor. Doing so causes a `std::bad_weak_ptr` crash because the ownership of the object has already expired. Therefore, if you require manual unregistration (to stop receiving events immediately), you must use an explicit Init/Term or RAII pattern.
958960

959-
### The Init/Term Pattern
961+
### Init/Term Pattern
960962

961963
`Init()`: Called after the object is fully constructed and owned by a `shared_ptr`. Registers the delegate.
962964

@@ -1014,6 +1016,65 @@ void RunClient()
10141016
}
10151017
```
10161018

1019+
### RAII Pattern
1020+
1021+
Alternative solution does not require calling `Term()`.
1022+
1023+
```cpp
1024+
class Subscriber : public std::enable_shared_from_this<Subscriber>
1025+
{
1026+
public:
1027+
Subscriber() : m_thread("SubscriberThread") {
1028+
m_thread.CreateThread();
1029+
}
1030+
1031+
// 1. Init: Call this immediately after std::make_shared<Subscriber>
1032+
void Init() {
1033+
// Create the delegate once and STORE it.
1034+
// We pass a shared_ptr, but the delegate internally converts and
1035+
// stores it as a weak_ptr to prevent circular reference cycles.
1036+
m_delegate = dmq::MakeDelegate(
1037+
shared_from_this(),
1038+
&Subscriber::HandleMsgCb,
1039+
m_thread
1040+
);
1041+
1042+
// Register using the member variable
1043+
Publisher::Instance().MsgCb += m_delegate;
1044+
}
1045+
1046+
~Subscriber() {
1047+
// 2. Unregister using the stored member.
1048+
// This works safely even in the destructor because the stored delegate
1049+
// preserves the identity required for removal.
1050+
Publisher::Instance().MsgCb -= m_delegate;
1051+
}
1052+
1053+
private:
1054+
void HandleMsgCb(const std::string& msg) {
1055+
std::cout << msg << std::endl;
1056+
}
1057+
1058+
Thread m_thread;
1059+
1060+
// Store the delegate to allow safe unregistration in the destructor.
1061+
// 'Sp' suffix indicates this delegate is specialized for smart pointers.
1062+
dmq::DelegateMemberAsyncSp<Subscriber, void(const std::string&)> m_delegate;
1063+
};
1064+
```
1065+
1066+
### Object Lifetime Usage Guide
1067+
1068+
This table outlines when it is safe to use raw pointers (`this`) during delegate registration versus when you must use `std::shared_ptr` (`shared_from_this()`) to prevent crashes.
1069+
1070+
| Delegate Type | Behavior | Safe to use `this`? | Explanation |
1071+
| --- | --- | --- | --- |
1072+
| Synchronous | Function is called immediately on the current thread.| YES | The caller waits for the callback to complete. It is impossible for the object to be destroyed while the callback is running. Must unregister in destructor. |
1073+
| Async (Non-Blocking) | "Fire and Forget." Message posted to target thread queue. Caller continues immediately. | NO | Unsafe. Even if you unregister in the destructor, a message may already be pending in the queue. If the object dies before the queue is processed, the target thread will access freed memory. Use `shared_from_this()`. |
1074+
| Async Blocking (`WAIT_INFINITE`) | Caller thread blocks until the target thread executes the function. | YES | Because the caller is blocked, the object (owned by the caller) cannot go out of scope or be destroyed until the callback finishes. |
1075+
| Async Blocking (Timeout) | Caller thread blocks until success OR timeout expires. | NO | Unsafe. If the timeout expires, the caller proceeds and may destroy the object. However, the message is still in the queue. When the target thread eventually processes it, it will crash. Use `shared_from_this()`. |
1076+
| Async (Singleton / Global) | Object lifetime exceeds the thread lifetime (e.g., Singleton, Static, or Global). | YES | Safe. Since the object is guaranteed to exist for the entire duration of the application (or until after the worker thread is destroyed), the pointer will never be invalid. |
1077+
10171078
## Library Dependencies
10181079

10191080
The `DelegateMQ` library external dependencies are based upon on the intended use. Interfaces provide the delegate library with platform-specific features to ease porting to a target system. Complete example code offer ready-made solutions or create your own.
@@ -1738,64 +1799,89 @@ SystemMode::Type SysDataNoLock::SetSystemModeAsyncWaitAPI(SystemMode::Type syste
17381799
// Async send an actuator command and block the caller waiting until the remote
17391800
// client successfully receives the message or timeout. The execution order sequence
17401801
// numbers shown below.
1741-
bool NetworkMgr::SendActuatorMsgWait(ActuatorMsg& msg)
1802+
template <class TClass, class RetType, class... Args>
1803+
bool RemoteInvoke(RemoteEndpoint<TClass, RetType(Args...)>& endpointDel, Args&&... args)
17421804
{
17431805
// If caller is not executing on m_thread
17441806
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId())
17451807
{
17461808
// *** Caller's thread executes this control branch ***
17471809
1748-
std::atomic<bool> success(false);
1749-
std::mutex mtx;
1750-
std::condition_variable cv;
1810+
struct SyncState {
1811+
std::atomic<bool> success{ false };
1812+
bool complete = false;
1813+
std::mutex mtx;
1814+
std::condition_variable cv;
1815+
1816+
// Use fix-block memory allocator if DMQ_ALLOCATOR set
1817+
XALLOCATOR
1818+
};
1819+
1820+
auto state = std::make_shared<SyncState>();
1821+
dmq::DelegateRemoteId remoteId = endpointDel.GetRemoteId();
17511822
1752-
// 7. Callback lambda handler for transport monitor when remote receives message (success or failure).
1823+
// 8. Callback lambda handler for transport monitor when remote receives message (success or failure).
17531824
// Callback context is m_thread.
1754-
SendStatusCallback statusCb = [&success, &cv](dmq::DelegateRemoteId id, uint16_t seqNum, TransportMonitor::Status status) {
1755-
if (id == ids::ACTUATOR_MSG_ID) {
1756-
// Client received ActuatorMsg?
1757-
if (status == TransportMonitor::Status::SUCCESS)
1758-
success.store(true);
1759-
cv.notify_one();
1825+
SendStatusCallback statusCb = [state, remoteId](dmq::DelegateRemoteId id, uint16_t seqNum, TransportMonitor::Status status) {
1826+
if (id == remoteId) {
1827+
{
1828+
std::lock_guard<std::mutex> lock(state->mtx);
1829+
state->complete = true;
1830+
if (status == TransportMonitor::Status::SUCCESS)
1831+
state->success.store(true); // Client received message
1832+
}
1833+
state->cv.notify_one();
17601834
}
17611835
};
17621836
17631837
// 1. Register for send status callback (success or failure)
17641838
m_transportMonitor.SendStatusCb += dmq::MakeDelegate(statusCb);
17651839
1766-
// 2. Async reinvoke SendActuatorMsgWait() on m_thread context and wait for send to complete
1767-
auto del = MakeDelegate(this, &NetworkMgr::SendActuatorMsgWait, m_thread, SEND_TIMEOUT);
1768-
auto retVal = del.AsyncInvoke(msg);
1840+
// 3. Lambda that binds and forwards arguments to RemoteInvoke with specified endpoint
1841+
std::function<RetType(Args...)> func = [this, &endpointDel](Args&&... args) {
1842+
return this->RemoteInvoke<TClass, RetType, Args...>(
1843+
endpointDel, std::forward<Args>(args)...);
1844+
};
1845+
1846+
// 2. Async reinvoke func on m_thread context and wait for send to complete
1847+
auto del = dmq::MakeDelegate(func, m_thread, SEND_TIMEOUT);
1848+
auto retVal = del.AsyncInvoke(std::forward<Args>(args)...);
17691849
1770-
// 5. Check that the remote delegate send succeeded
1850+
// 6. Check that the remote delegate send succeeded
17711851
if (retVal.has_value() && // If async function call succeeded AND
17721852
retVal.value() == true) // async function call returned true
17731853
{
1774-
// 6. Wait for statusCb callback to be invoked. Callback invoked when the
1775-
// receiver ack's the message or timeout.
1776-
std::unique_lock<std::mutex> lock(mtx);
1777-
if (cv.wait_for(lock, RECV_TIMEOUT) == std::cv_status::timeout)
1854+
// 7. Wait for statusCb callback to be invoked. Callback invoked when the
1855+
// receiver ack's the message or transport monitor timeout.
1856+
std::unique_lock<std::mutex> lock(state->mtx);
1857+
while (!state->complete)
17781858
{
1779-
// Timeout waiting for remote delegate message ack
1780-
std::cout << "Timeout SendActuatorMsgWait()" << std::endl;
1859+
if (state->cv.wait_for(lock, RECV_TIMEOUT) == std::cv_status::timeout)
1860+
{
1861+
// Timeout waiting for remote delegate message ack
1862+
std::cout << "Timeout RemoteInvoke()" << std::endl;
1863+
1864+
// Set complete to true to exit wait loop
1865+
state->complete = true;
1866+
}
17811867
}
17821868
}
17831869
1784-
// 8. Unregister from status callback
1870+
// 9. Unregister from status callback
17851871
m_transportMonitor.SendStatusCb -= dmq::MakeDelegate(statusCb);
17861872
1787-
// 9. Return the blocking async function invoke status to caller
1788-
return success.load();
1873+
// 10. Return the blocking async function invoke status to caller
1874+
return state->success.load();
17891875
}
17901876
else
17911877
{
17921878
// *** NetworkMgr::m_thread executes this control branch ***
17931879
1794-
// 3. Send actuator command to remote on m_thread
1795-
m_actuatorMsgDel(msg);
1880+
// 4. Invoke endpoint delegate to send argument data to remote
1881+
endpointDel(std::forward<Args>(args)...);
17961882
1797-
// 4. Check if send succeeded
1798-
if (m_actuatorMsgDel.GetError() == DelegateError::SUCCESS)
1883+
// 5. Check if send succeeded
1884+
if (endpointDel.GetError() == dmq::DelegateError::SUCCESS)
17991885
{
18001886
return true;
18011887
}

example/sample-code/AsyncAPI.cpp

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,21 @@ namespace Example
5050
}
5151

5252
// Communication class with a fully asynchronous public API
53-
class Communication
53+
// Inherit from enable_shared_from_this to allow safe async delegate creation
54+
class Communication : public std::enable_shared_from_this<Communication>
5455
{
5556
public:
5657
// All public functions async invoke the function call onto comm_thread
5758

5859
// Async API calls the private implementation
5960
size_t SendData(const std::string& data) {
60-
return AsyncInvoke(this, &Communication::SendDataPrivate, comm_thread, WAIT_INFINITE, data);
61+
return AsyncInvoke(shared_from_this(), &Communication::SendDataPrivate, comm_thread, WAIT_INFINITE, data);
6162
}
6263

6364
// Alternate async API implementation style all within the public function
6465
size_t SendDataV2(const std::string& data) {
6566
if (comm_thread.GetThreadId() != Thread::GetCurrentThreadId())
66-
return MakeDelegate(this, &Communication::SendDataV2, comm_thread, WAIT_INFINITE)(data);
67+
return MakeDelegate(shared_from_this(), &Communication::SendDataV2, comm_thread, WAIT_INFINITE)(data);
6768

6869
std::this_thread::sleep_for(std::chrono::seconds(2)); // Simulate sending
6970
cout << data << endl;
@@ -73,7 +74,9 @@ namespace Example
7374
// Alternate async API using a lambda
7475
size_t SendDataV3(const std::string& data)
7576
{
76-
auto sendDataLambda = [this](const std::string& data) -> bool {
77+
// Capture 'self' (shared_ptr) instead of raw 'this' for robustness
78+
auto self = shared_from_this();
79+
auto sendDataLambda = [self](const std::string& data) -> bool {
7780
std::this_thread::sleep_for(std::chrono::seconds(2)); // Simulate sending
7881
cout << data << endl;
7982
return data.size(); // Return the 'bytes_sent' sent result
@@ -83,11 +86,11 @@ namespace Example
8386
}
8487

8588
void SetMode(bool mode) {
86-
AsyncInvoke(this, &Communication::SetModePrivate, comm_thread, WAIT_INFINITE, mode);
89+
AsyncInvoke(shared_from_this(), &Communication::SetModePrivate, comm_thread, WAIT_INFINITE, mode);
8790
}
8891

8992
bool GetMode() {
90-
return AsyncInvoke(this, &Communication::GetModePrivate, comm_thread, WAIT_INFINITE);
93+
return AsyncInvoke(shared_from_this(), &Communication::GetModePrivate, comm_thread, WAIT_INFINITE);
9194
}
9295

9396
private:
@@ -109,14 +112,15 @@ namespace Example
109112
{
110113
comm_thread.CreateThread();
111114

112-
Communication comm;
115+
// Must use std::make_shared to support shared_from_this()
116+
auto comm = std::make_shared<Communication>();
113117

114118
// Test async invoke of member functions
115-
comm.SetMode(true);
116-
bool mode = comm.GetMode();
117-
comm.SendData("SendData message");
118-
comm.SendDataV2("SendDataV2 message");
119-
comm.SendDataV3("SendDataV3 message");
119+
comm->SetMode(true);
120+
bool mode = comm->GetMode();
121+
comm->SendData("SendData message");
122+
comm->SendDataV2("SendDataV2 message");
123+
comm->SendDataV3("SendDataV3 message");
120124

121125
// Test async invoke of free functions
122126
set_mode(true);
@@ -125,4 +129,4 @@ namespace Example
125129

126130
comm_thread.ExitThread();
127131
}
128-
}
132+
}

0 commit comments

Comments
 (0)