Skip to content

Commit 4f10da2

Browse files
committed
Update RemoteDelegate error handling. Update examples.
1 parent f6c8837 commit 4f10da2

File tree

20 files changed

+226
-75
lines changed

20 files changed

+226
-75
lines changed

example/sample-projects/mqtt-rapidjson/common/data.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ class DataPoint
4949
class Data
5050
{
5151
public:
52+
static const dmq::DelegateRemoteId DATA_ID = 1;
53+
5254
std::vector<DataPoint> dataPoints;
5355
std::string msg;
5456

example/sample-projects/mqtt-rapidjson/pub/main.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@ static void ProcessTimers()
1919

2020
int main()
2121
{
22-
const DelegateRemoteId id = 1;
23-
2422
// Start the thread that will run ProcessTimers
2523
std::thread timerThread(ProcessTimers);
2624

2725
// Create Sender object
28-
Sender sender(id);
26+
Sender sender;
2927

3028
sender.Start();
3129

example/sample-projects/mqtt-rapidjson/pub/sender.h

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,16 @@
1111
using namespace dmq;
1212
using namespace std;
1313

14+
typedef std::function<void(dmq::DelegateRemoteId, dmq::DelegateError, dmq::DelegateErrorAux)> ErrorCallback;
15+
1416
/// @brief Sender is an active object with a thread. The thread sends data to the
1517
/// Receiver every time the timer expires.
1618
class Sender
1719
{
1820
public:
19-
Sender(DelegateRemoteId id) :
21+
Sender() :
2022
m_thread("Sender"),
21-
m_sendDelegate(id),
23+
m_sendDelegate(Data::DATA_ID),
2224
m_argStream(ios::in | ios::out | ios::binary)
2325
{
2426
// Set the delegate interfaces
@@ -35,25 +37,34 @@ class Sender
3537
m_thread.CreateThread();
3638
}
3739

38-
~Sender() { Stop(); }
40+
~Sender() { m_thread.ExitThread(); }
3941

4042
void Start()
4143
{
44+
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId())
45+
return MakeDelegate(this, &Sender::Start, m_thread)();
46+
4247
// Start a timer to send data
4348
m_sendTimer.Expired = MakeDelegate(this, &Sender::Send, m_thread);
49+
//m_sendTimer.Expired = MakeDelegate(this, &Sender::SendV2, m_thread);
4450
m_sendTimer.Start(std::chrono::milliseconds(500));
4551
}
4652

4753
void Stop()
4854
{
55+
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId())
56+
return MakeDelegate(this, &Sender::Stop, m_thread)();
57+
4958
m_sendTimer.Stop();
50-
m_thread.ExitThread();
5159
m_transport.Close();
5260
}
5361

5462
// Send data to the remote
5563
void Send()
5664
{
65+
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId())
66+
return MakeDelegate(this, &Sender::Send, m_thread)();
67+
5768
Data data;
5869
for (int i = 0; i < 5; i++)
5970
{
@@ -67,10 +78,49 @@ class Sender
6778
m_sendDelegate(data);
6879
}
6980

81+
// Send data to the remote and capture send success or error with lambda
82+
void SendV2()
83+
{
84+
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId())
85+
return MakeDelegate(this, &Sender::SendV2, m_thread)();
86+
87+
bool success = false;
88+
89+
// Callback to capture invoke m_sendDelegate() success or error
90+
ErrorCallback errorCb = [&success](dmq::DelegateRemoteId id, dmq::DelegateError err, dmq::DelegateErrorAux aux) {
91+
if (id == Data::DATA_ID) {
92+
// Send success?
93+
if (err == dmq::DelegateError::SUCCESS)
94+
success = true;
95+
}
96+
};
97+
98+
// Register for callback
99+
m_sendDelegate.SetErrorHandler(MakeDelegate(errorCb));
100+
101+
// Create data
102+
Data data;
103+
for (int i = 0; i < 5; i++)
104+
{
105+
DataPoint dataPoint;
106+
dataPoint.x = x++;
107+
dataPoint.y = y++;
108+
data.dataPoints.push_back(dataPoint);
109+
}
110+
data.msg = "Data Message ";
111+
112+
// Send data to remote receiver
113+
m_sendDelegate(data);
114+
115+
// Send complete; unregister from callback
116+
m_sendDelegate.ClearErrorHandler();
117+
}
118+
70119
private:
71-
void ErrorHandler(DelegateRemoteId, DelegateError, DelegateErrorAux)
120+
void ErrorHandler(DelegateRemoteId, DelegateError err, DelegateErrorAux)
72121
{
73-
ASSERT_TRUE(0);
122+
if (err != dmq::DelegateError::SUCCESS)
123+
ASSERT_TRUE(0);
74124
}
75125

76126
Thread m_thread;

example/sample-projects/mqtt-rapidjson/sub/main.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,14 @@ void DataCb(Data& data)
2727

2828
int main()
2929
{
30-
const DelegateRemoteId id = 1;
31-
3230
// Start the thread that will run ProcessTimers
3331
std::thread timerThread(ProcessTimers);
3432

3533
Thread recvThread("RecvThread");
3634
recvThread.CreateThread();
3735

3836
// Create Receiver object
39-
Receiver receiver(id);
37+
Receiver receiver;
4038
receiver.DataCb += dmq::MakeDelegate(&DataCb, recvThread);
4139

4240
receiver.Start();

example/sample-projects/mqtt-rapidjson/sub/receiver.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ class Receiver : public IMqttReceiveHandler
1919
public:
2020
dmq::MulticastDelegateSafe<void(Data&)> DataCb;
2121

22-
Receiver(DelegateRemoteId id) :
23-
m_id(id),
22+
Receiver() :
2423
m_thread("Receiver"),
2524
m_argStream(ios::in | ios::out | ios::binary)
2625
{
2726
// Set the delegate interfaces
2827
m_recvDelegate.SetStream(&m_argStream);
2928
m_recvDelegate.SetSerializer(&m_serializer);
30-
m_recvDelegate = MakeDelegate(this, &Receiver::DataUpdate, id);
29+
m_recvDelegate.SetErrorHandler(MakeDelegate(this, &Receiver::ErrorHandler));
30+
m_recvDelegate = MakeDelegate(this, &Receiver::DataUpdate, Data::DATA_ID);
3131

3232
// Set the transport
3333
m_transport.Create(MqttTransport::Type::SUB);
@@ -76,6 +76,12 @@ class Receiver : public IMqttReceiveHandler
7676
DataCb(data);
7777
}
7878

79+
void ErrorHandler(DelegateRemoteId, DelegateError err, DelegateErrorAux)
80+
{
81+
if (err != dmq::DelegateError::SUCCESS)
82+
ASSERT_TRUE(0);
83+
}
84+
7985
DelegateRemoteId m_id;
8086
Thread m_thread;
8187
Timer m_recvTimer;

example/sample-projects/system-architecture/client/ClientApp.h

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#include "Sensor.h"
77
#include "DataMsg.h"
88

9+
typedef std::function<void(dmq::DelegateRemoteId, dmq::DelegateError, dmq::DelegateErrorAux)> ErrorCallback;
10+
911
// ClientApp reads data locally and sends to DataMgr for storage.
1012
class ClientApp
1113
{
@@ -16,17 +18,49 @@ class ClientApp
1618
return instance;
1719
}
1820

19-
void Start()
21+
/// Start data collection locally (polling with a timer) and remotely (sending a
22+
/// start message to ServerApp).
23+
/// @return `true` if data collection command sent successfully to ServerApp.
24+
/// `false` if the send fails.
25+
bool Start()
2026
{
21-
// Send message to start remote data collection
27+
bool success = false;
28+
std::mutex mtx;
29+
std::condition_variable cv;
30+
31+
// Callback to capture NetworkMgr::SendCommandMsg() success or error
32+
ErrorCallback errorCb = [&success, &cv](dmq::DelegateRemoteId id, dmq::DelegateError err, dmq::DelegateErrorAux aux) {
33+
// SendCommandMsg() ID?
34+
if (id == COMMAND_MSG_ID) {
35+
// Send success?
36+
if (err == dmq::DelegateError::SUCCESS)
37+
success = true;
38+
cv.notify_one();
39+
}
40+
};
41+
42+
// Register for callback
43+
NetworkMgr::ErrorCb += MakeDelegate(errorCb);
44+
2245
CommandMsg command;
2346
command.action = CommandMsg::Action::START;
2447
command.pollTime = 250;
48+
49+
// Async send message to start remote data collection
2550
NetworkMgr::Instance().SendCommandMsg(command);
2651

52+
// Wait for async send callback to be triggered
53+
std::unique_lock<std::mutex> lock(mtx);
54+
cv.wait(lock);
55+
56+
// Send complete; unregister from callback
57+
NetworkMgr::ErrorCb -= MakeDelegate(errorCb);
58+
2759
// Start local data collection
2860
m_pollTimer.Expired = MakeDelegate(this, &ClientApp::PollData, m_thread);
2961
m_pollTimer.Start(std::chrono::milliseconds(500));
62+
63+
return success;
3064
}
3165

3266
void Stop()
@@ -73,7 +107,8 @@ class ClientApp
73107

74108
void ErrorHandler(dmq::DelegateRemoteId id, dmq::DelegateError error, dmq::DelegateErrorAux aux)
75109
{
76-
std::cout << "ClientApp Error: " << id << " " << (int)error << " " << aux << std::endl;
110+
if (error != dmq::DelegateError::SUCCESS)
111+
std::cout << "ClientApp Error: " << id << " " << (int)error << " " << aux << std::endl;
77112
}
78113

79114
void TimeoutHandler(uint16_t seqNum, dmq::DelegateRemoteId id)

example/sample-projects/system-architecture/common/AlarmMgr.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ class AlarmMgr
6565

6666
void ErrorHandler(dmq::DelegateRemoteId id, dmq::DelegateError error, dmq::DelegateErrorAux aux)
6767
{
68-
std::cout << "AlarmMgr Error: " << id << " " << (int)error << " " << aux << std::endl;
68+
if (error != dmq::DelegateError::SUCCESS)
69+
std::cout << "AlarmMgr Error: " << id << " " << (int)error << " " << aux << std::endl;
6970
}
7071

7172
Thread m_thread;

example/sample-projects/system-architecture/common/NetworkMgr.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@ static const dmq::DelegateRemoteId ALARM_MSG_ID = 1;
1717
static const dmq::DelegateRemoteId DATA_MSG_ID = 2;
1818
static const dmq::DelegateRemoteId COMMAND_MSG_ID = 3;
1919

20-
// NetworkMgr sends and receives data using a delegate transport implemented
21-
// using ZeroMQ library. Class is thread safe. All public APIs are asynchronous.
20+
/// @brief NetworkMgr sends and receives data using a delegate transport implemented
21+
/// using ZeroMQ library. Class is thread safe. All public APIs are asynchronous.
22+
///
23+
/// @details NetworkMgr has its own internal thread of control. All DelegateMemberRemote<>
24+
/// must only be invoked on the internal thread. All public APIs are asynchronous.
25+
/// Register with ErrorCb or TimeoutCb to handle errors.
2226
class NetworkMgr
2327
{
2428
public:

example/sample-projects/system-architecture/server/ServerApp.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ class ServerApp
8484

8585
void ErrorHandler(dmq::DelegateRemoteId id, dmq::DelegateError error, dmq::DelegateErrorAux aux)
8686
{
87-
std::cout << "ServerApp Error: " << id << " " << (int)error << " " << aux << std::endl;
87+
if (error != dmq::DelegateError::SUCCESS)
88+
std::cout << "ServerApp Error: " << id << " " << (int)error << " " << aux << std::endl;
8889
}
8990

9091
void TimeoutHandler(uint16_t seqNum, dmq::DelegateRemoteId id)

example/sample-projects/system-architecture/server/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ int main()
4343
ServerApp::Instance();
4444

4545
// Let client and server communicate
46-
std::this_thread::sleep_for(std::chrono::seconds(1000));
46+
std::this_thread::sleep_for(std::chrono::seconds(120));
4747

4848
NetworkMgr::Instance().Stop();
4949

0 commit comments

Comments
 (0)