Skip to content

Commit 5840cf2

Browse files
committed
Update example code, delegate containers, and serializers.
1 parent 2f63cb9 commit 5840cf2

File tree

11 files changed

+330
-154
lines changed

11 files changed

+330
-154
lines changed

docs/DETAILS.md

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ The DelegateMQ C++ library enables function invocations on any callable, either
6565
- [Blocking Reinvoke](#blocking-reinvoke)
6666
- [Timer Example](#timer-example)
6767
- [`std::async` Thread Targeting Example](#stdasync-thread-targeting-example)
68+
- [Remote Delegate Example](#remote-delegate-example)
6869
- [More Examples](#more-examples)
6970
- [Sample Projects](#sample-projects)
7071
- [system-architecture](#system-architecture)
@@ -432,6 +433,7 @@ A delegate container stores one or more delegates. A delegate container is calla
432433
```cpp
433434
// Delegate Containers
434435
UnicastDelegate<>
436+
UnicastDelegateSafe<>
435437
MulticastDelegate<>
436438
MulticastDelegateSafe<>
437439
```
@@ -966,6 +968,7 @@ DelegateBase
966968

967969
// Delegate Containers
968970
UnicastDelegate<>
971+
UnicastDelegateSafe<>
969972
MulticastDelegate<>
970973
MulticastDelegateSafe<>
971974
```
@@ -1097,6 +1100,8 @@ There is no way to asynchronously pass a C-style array by value. Avoid C-style a
10971100
10981101
# Interfaces
10991102
1103+
DelegateMQ interface classes allow customizing the library runtime behavior.
1104+
11001105
## `IThread`
11011106
11021107
The `IThread` interface is used to send a delegate and argument data through a message queue. A delegate thread is required to dispatch asynchronous delegates to a specified target thread. The delegate library automatically constructs a `DelegateMsg` containing everything necessary for the destination thread.
@@ -1600,6 +1605,106 @@ void AsyncFutureExample()
16001605
comm_thread.ExitThread();
16011606
}
16021607
```
1608+
## Remote Delegate Example
1609+
1610+
Use a remote delegate to invoke a remote target function using [MessagePack](https://github.com/msgpack/msgpack-c/tree/cpp_master) (serialization) and [ZeroMQ](https://zeromq.org/) (transport). Code snippets below express the concept. See [system-architecture](#system-architecture) to execute this example.
1611+
1612+
`NetworkMgr` class used by client and server applications handles communication with remote device. `SendAlarmMsg()` sends a message to the remote. `AlarmMsgCb` callback is used to receive the incoming message.
1613+
1614+
```cpp
1615+
#include "DelegateMQ.h"
1616+
1617+
// NetworkMgr sends and receives data using a DelegateMQ transport implemented
1618+
// with ZeroMQ and MessagePack libraries.
1619+
class NetworkMgr
1620+
{
1621+
public:
1622+
static const dmq::DelegateRemoteId ALARM_MSG_ID = 1;
1623+
1624+
using AlarmFunc = void(AlarmMsg&, AlarmNote&);
1625+
using AlarmDel = dmq::MulticastDelegateSafe<AlarmFunc>;
1626+
1627+
// Receive callbacks for incoming messages by registering with this container
1628+
static AlarmDel AlarmMsgCb;
1629+
1630+
int Create();
1631+
1632+
// Send alarm message to the remote
1633+
void SendAlarmMsg(AlarmMsg& msg, AlarmNote& note);
1634+
1635+
private:
1636+
// Serialize function argument data into a stream
1637+
xostringstream m_argStream;
1638+
1639+
// Transport using ZeroMQ library. Only call transport from NetworkMsg thread.
1640+
ZeroMqTransport m_transport;
1641+
1642+
// Dispatcher using DelegateMQ library
1643+
Dispatcher m_dispatcher;
1644+
1645+
// Alarm message remote delegate
1646+
Serializer<AlarmFunc> m_alarmMsgSer;
1647+
dmq::DelegateMemberRemote<AlarmDel, AlarmFunc> m_alarmMsgDel;
1648+
};
1649+
```
1650+
1651+
At runtime, setup the remote delegate interface. A key point is the remote delegate (`m_alarmMsgDel`) binds directly to the `AlarmMsgCb` container (`dmq::MulticastDelegateSafe<AlarmFunc>`). Every incoming message generates one or more callbacks to registered listeners by binding `m_alarmMsgDel` remote delegate to the `AlarmMsgCb` container. A delegate may bind to any callable, and a delegate container is callable.
1652+
1653+
```cpp
1654+
int NetworkMgr::Create()
1655+
{
1656+
// Reinvoke call onto NetworkMgr thread
1657+
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId())
1658+
return MakeDelegate(this, &NetworkMgr::Create, m_thread, WAIT_INFINITE)();
1659+
1660+
// Setup the remote delegate interfaces
1661+
m_alarmMsgDel.SetStream(&m_argStream);
1662+
m_alarmMsgDel.SetSerializer(&m_alarmMsgSer);
1663+
m_alarmMsgDel.SetDispatcher(&m_dispatcher);
1664+
m_alarmMsgDel.SetErrorHandler(MakeDelegate(this, &NetworkMgr::ErrorHandler));
1665+
1666+
// Bind the remote delegate to the AlarmMsgCb delegate container
1667+
m_alarmMsgDel.Bind(&AlarmMsgCb, &AlarmDel::operator(), ALARM_MSG_ID);
1668+
1669+
// Set the transport used by the dispatcher
1670+
m_dispatcher.SetTransport(&m_transport);
1671+
1672+
return 1;
1673+
}
1674+
```
1675+
1676+
Send message to the remote by calling the `m_alarmMsgDel` delegate on `m_thread` context. A ZeroMQ socket instance is not thread safe so ZeroMQ socket access is always on the `NetworkMgr` internal thread.
1677+
1678+
```cpp
1679+
void NetworkMgr::SendAlarmMsg(AlarmMsg& msg, AlarmNote& note)
1680+
{
1681+
// Reinvoke call onto NetworkMgr thread
1682+
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId())
1683+
return MakeDelegate(this, &NetworkMgr::SendAlarmMsg, m_thread)(msg, note);
1684+
1685+
// Send alarm to remote. Invoke remote delegate on m_thread only.
1686+
m_alarmMsgDel(msg, note);
1687+
}
1688+
```
1689+
1690+
Client registers to receive remote delegate callbacks on the `AlarmMgr` private thread.
1691+
1692+
```cpp
1693+
class AlarmMgr
1694+
{
1695+
AlarmMgr()
1696+
{
1697+
// Register for alarm callback on m_thread
1698+
NetworkMgr::AlarmMsgCb += MakeDelegate(this, &AlarmMgr::RecvAlarmMsg, m_thread);
1699+
}
1700+
1701+
void RecvAlarmMsg(AlarmMsg& msg, AlarmNote& note)
1702+
{
1703+
// Handle incoming alarm message from remote
1704+
}
1705+
};
1706+
```
1707+
16031708
## More Examples
16041709

16051710
See the `examples/sample-code` directory for additional examples.
@@ -1610,7 +1715,7 @@ See the `examples/sample-projects` directory for example project. Most projects
16101715

16111716
### system-architecture
16121717

1613-
The System Architecture example demonstrates a complex client-server DelegateMQ application using the ZeroMQ and MessagePack support libraries. This example implements the acquisition of sensor and actuator data across two applications. It showcases communication and collaboration between subsystems, threads, and processes or processors. Delegate communication, callbacks, asynchronous APIs, and error handing are also highlighted. Notice how easily DelegateMQ transfers event data between threads and processes with minimal application code.
1718+
The System Architecture example demonstrates a complex client-server DelegateMQ application using the ZeroMQ and MessagePack support libraries. This example implements the acquisition of sensor and actuator data across two applications. It showcases communication and collaboration between subsystems, threads, and processes or processors. Delegate communication, callbacks, asynchronous APIs, and error handing are also highlighted. Notice how easily DelegateMQ transfers event data between threads and processes with minimal application code. The application layer is completely isolated from message passing details.
16141719

16151720
Execute the client and server projects to run the example.
16161721

example/sample-projects/system-architecture/client/ClientApp.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ClientApp
3131
// Callback to capture NetworkMgr::SendCommandMsg() success or error
3232
ErrorCallback errorCb = [&success, &cv](dmq::DelegateRemoteId id, dmq::DelegateError err, dmq::DelegateErrorAux aux) {
3333
// SendCommandMsg() ID?
34-
if (id == COMMAND_MSG_ID) {
34+
if (id == NetworkMgr::COMMAND_MSG_ID) {
3535
// Send success?
3636
if (err == dmq::DelegateError::SUCCESS)
3737
success = true;

example/sample-projects/system-architecture/common/NetworkMgr.cpp

Lines changed: 26 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -47,25 +47,26 @@ int NetworkMgr::Create()
4747
m_alarmMsgDel.SetSerializer(&m_alarmMsgSer);
4848
m_alarmMsgDel.SetDispatcher(&m_dispatcher);
4949
m_alarmMsgDel.SetErrorHandler(MakeDelegate(this, &NetworkMgr::ErrorHandler));
50-
m_alarmMsgDel = MakeDelegate(this, &NetworkMgr::RecvAlarmMsg, ALARM_MSG_ID);
50+
// Bind the remote delegate to the AlarmMsgCb delegate container for incoming msgs
51+
m_alarmMsgDel.Bind(&AlarmMsgCb, &AlarmDel::operator(), ALARM_MSG_ID);
5152

5253
m_dataMsgDel.SetStream(&m_argStream);
5354
m_dataMsgDel.SetSerializer(&m_dataMsgSer);
5455
m_dataMsgDel.SetDispatcher(&m_dispatcher);
5556
m_dataMsgDel.SetErrorHandler(MakeDelegate(this, &NetworkMgr::ErrorHandler));
56-
m_dataMsgDel = MakeDelegate(this, &NetworkMgr::RecvDataMsg, DATA_MSG_ID);
57+
m_dataMsgDel.Bind(&DataMsgCb, &DataDel::operator(), DATA_MSG_ID);
5758

5859
m_commandMsgDel.SetStream(&m_argStream);
5960
m_commandMsgDel.SetSerializer(&m_commandMsgSer);
6061
m_commandMsgDel.SetDispatcher(&m_dispatcher);
6162
m_commandMsgDel.SetErrorHandler(MakeDelegate(this, &NetworkMgr::ErrorHandler));
62-
m_commandMsgDel = MakeDelegate(this, &NetworkMgr::RecvCommandMsg, COMMAND_MSG_ID);
63+
m_commandMsgDel.Bind(&CommandMsgCb, &CommandDel::operator(), COMMAND_MSG_ID);
6364

6465
m_actuatorMsgDel.SetStream(&m_argStream);
6566
m_actuatorMsgDel.SetSerializer(&m_actuatorMsgSer);
6667
m_actuatorMsgDel.SetDispatcher(&m_dispatcher);
6768
m_actuatorMsgDel.SetErrorHandler(MakeDelegate(this, &NetworkMgr::ErrorHandler));
68-
m_actuatorMsgDel = MakeDelegate(this, &NetworkMgr::RecvActuatorMsg, ACTUATOR_MSG_ID);
69+
m_actuatorMsgDel.Bind(&ActuatorMsgCb, &ActuatorDel::operator(), ACTUATOR_MSG_ID);
6970

7071
int err = 0;
7172
#ifdef SERVER_APP
@@ -81,7 +82,7 @@ int NetworkMgr::Create()
8182
// Set the transport used by the dispatcher
8283
m_dispatcher.SetTransport(&m_transport);
8384

84-
// Set receive async delegates into map
85+
// Set remote delegate into map
8586
m_receiveIdMap[ALARM_MSG_ID] = &m_alarmMsgDel;
8687
m_receiveIdMap[COMMAND_MSG_ID] = &m_commandMsgDel;
8788
m_receiveIdMap[DATA_MSG_ID] = &m_dataMsgDel;
@@ -123,7 +124,7 @@ void NetworkMgr::SendAlarmMsg(AlarmMsg& msg, AlarmNote& note)
123124
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId())
124125
return MakeDelegate(this, &NetworkMgr::SendAlarmMsg, m_thread)(msg, note);
125126

126-
// Send alarm to remote.
127+
// Send alarm to remote. Invoke remote delegate on m_thread only.
127128
m_alarmMsgDel(msg, note);
128129
}
129130

@@ -145,18 +146,20 @@ void NetworkMgr::SendDataMsg(DataMsg& data)
145146
m_dataMsgDel(data);
146147
}
147148

148-
// Async send an actuator command and block the caller waiting for the client
149-
// to respond. The execution sequence numbers shown below.
149+
// Async send an actuator command and block the caller waiting until the client
150+
// responds or timeout. The execution sequence numbers shown below.
150151
bool NetworkMgr::SendActuatorMsgWait(ActuatorMsg& msg)
151152
{
152153
// If caller is not executing on m_thread
153154
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId())
154155
{
156+
// *** Caller's thread executes this control brach ***
157+
155158
bool success = false;
156159
std::mutex mtx;
157160
std::condition_variable cv;
158161

159-
// 1. Callback handler for transport monitor send status
162+
// 5. Callback handler for transport monitor send status
160163
SendStatusCallback statusCb = [&success, &cv](uint16_t seqNum, dmq::DelegateRemoteId id, TransportMonitor::Status status) {
161164
if (id == ACTUATOR_MSG_ID) {
162165
// Client received ActuatorMsg?
@@ -166,15 +169,15 @@ bool NetworkMgr::SendActuatorMsgWait(ActuatorMsg& msg)
166169
}
167170
};
168171

169-
// 2. Register for send status callback (success or failure)
172+
// 1. Register for send status callback (success or failure)
170173
m_transportMonitor.SendStatusCb += dmq::MakeDelegate(statusCb);
171174

172-
// 3. Renvoke SendActuatorMsgWait() on m_thread context
175+
// 2. Reinvoke SendActuatorMsgWait() on m_thread context
173176
MakeDelegate(this, &NetworkMgr::SendActuatorMsgWait, m_thread)(msg);
174177

175-
// 5. Wait for statusCb callback to be triggered on m_thread
178+
// 3. Wait for statusCb callback to be triggered on m_thread
176179
std::unique_lock<std::mutex> lock(mtx);
177-
if (cv.wait_for(lock, TIMEOUT + std::chrono::milliseconds(100)) == std::cv_status::timeout)
180+
if (cv.wait_for(lock, TIMEOUT) == std::cv_status::timeout)
178181
{
179182
// A timeout should never happen. The transport monitor is setup for a max
180183
// TIMEOUT, so success or failure should never cause cv_status::timeout
@@ -184,13 +187,17 @@ bool NetworkMgr::SendActuatorMsgWait(ActuatorMsg& msg)
184187
// 6. Unregister from status callback
185188
m_transportMonitor.SendStatusCb -= dmq::MakeDelegate(statusCb);
186189

187-
// 7. Return the blocking async function invoke status
190+
// 7. Return the blocking async function invoke status to caller
188191
return success;
189192
}
193+
else
194+
{
195+
// *** NetworkMgr::m_thread executes this control brach ***
190196

191-
// 4. Send actuator command to remote on m_thread.
192-
m_actuatorMsgDel(msg);
193-
return true;
197+
// 4. Send actuator command to remote on m_thread.
198+
m_actuatorMsgDel(msg);
199+
return true;
200+
}
194201
}
195202

196203
// Poll called periodically on m_thread context
@@ -209,7 +216,8 @@ void NetworkMgr::Poll()
209216
auto receiveDelegate = m_receiveIdMap[header.GetId()];
210217
if (receiveDelegate)
211218
{
212-
// Invoke the receiver target function with the sender's argument data
219+
// Invoke the receiver target callable with the sender's
220+
// incoming argument data
213221
receiveDelegate->Invoke(arg_data);
214222
}
215223
else
@@ -236,23 +244,4 @@ void NetworkMgr::SendStatusHandler(uint16_t seqNum, dmq::DelegateRemoteId id, Tr
236244
SendStatusCb(seqNum, id, status);
237245
}
238246

239-
void NetworkMgr::RecvAlarmMsg(AlarmMsg& msg, AlarmNote& note)
240-
{
241-
AlarmMsgCb(msg, note);
242-
}
243-
244-
void NetworkMgr::RecvCommandMsg(CommandMsg& command)
245-
{
246-
CommandMsgCb(command);
247-
}
248-
249-
void NetworkMgr::RecvDataMsg(DataMsg& data)
250-
{
251-
DataMsgCb(data);
252-
}
253-
254-
void NetworkMgr::RecvActuatorMsg(ActuatorMsg& msg)
255-
{
256-
ActuatorMsgCb(msg);
257-
}
258247

0 commit comments

Comments
 (0)