Skip to content

Commit 749ef96

Browse files
committed
Update Timer to support once. Fix thread exit reliability. More unit tests.
1 parent b2175ad commit 749ef96

File tree

9 files changed

+192
-115
lines changed

9 files changed

+192
-115
lines changed

example/sample-projects/system-architecture/client/ClientApp.h

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,13 @@ class ClientApp
6262

6363
// Start actuator updates
6464
m_actuatorTimer.Expired = MakeDelegate(this, &ClientApp::ActuatorUpdate, m_thread);
65-
m_actuatorTimer.Start(std::chrono::milliseconds(1000));
65+
m_actuatorTimer.Start(std::chrono::milliseconds(1000), true);
6666

6767
return success;
6868
}
6969

7070
void Stop()
7171
{
72-
m_stop = true;
73-
7472
CommandMsg command;
7573
command.action = CommandMsg::Action::STOP;
7674
NetworkMgr::Instance().SendCommandMsg(command);
@@ -95,16 +93,15 @@ class ClientApp
9593
NetworkMgr::SendStatusCb += MakeDelegate(this, &ClientApp::SendStatusHandler, m_thread);
9694
}
9795

98-
~ClientApp()
96+
~ClientApp()
9997
{
98+
NetworkMgr::ErrorCb -= MakeDelegate(this, &ClientApp::ErrorHandler, m_thread);
99+
NetworkMgr::SendStatusCb -= MakeDelegate(this, &ClientApp::SendStatusHandler, m_thread);
100100
m_thread.ExitThread();
101101
}
102102

103103
void PollData()
104104
{
105-
if (m_stop)
106-
return;
107-
108105
// Collect sensor and actuator data
109106
DataMsg dataMsg;
110107
dataMsg.actuators.push_back(m_actuator3.GetState());
@@ -118,9 +115,6 @@ class ClientApp
118115

119116
void ActuatorUpdate()
120117
{
121-
if (m_stop)
122-
return;
123-
124118
static int cnt = 0;
125119
bool position;
126120
if (++cnt % 2)
@@ -144,6 +138,10 @@ class ClientApp
144138

145139
if (!success1 || !success2)
146140
std::cout << "Remote actuator failed!" << std::endl;
141+
142+
// Explicitly start timer for next time. Prevent excessive timer messages in the thread
143+
// queue if "Wait" calls above block a long time due to communications down.
144+
m_actuatorTimer.Start(std::chrono::milliseconds(1000), true);
147145
}
148146

149147
void ErrorHandler(dmq::DelegateRemoteId id, dmq::DelegateError error, dmq::DelegateErrorAux aux)
@@ -167,8 +165,6 @@ class ClientApp
167165
Actuator m_actuator4;
168166
Sensor m_sensor3;
169167
Sensor m_sensor4;
170-
171-
bool m_stop = false;
172168
};
173169

174170
#endif

example/sample-projects/system-architecture/server/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ int main()
4343
ServerApp::Instance();
4444

4545
// Let client and server communicate
46-
std::this_thread::sleep_for(std::chrono::seconds(120));
46+
std::this_thread::sleep_for(std::chrono::seconds(45));
4747

4848
NetworkMgr::Instance().Stop();
4949

src/delegate-mq/delegate/DelegateAsyncWait.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ class DelegateFreeAsyncWait<RetType(Args...)> : public DelegateFree<RetType(Args
350350
return GetRetVal();
351351
} else {
352352
// Return a default return value
353-
return RetType{};
353+
return RetType();
354354
}
355355
}
356356
}
@@ -427,7 +427,7 @@ class DelegateFreeAsyncWait<RetType(Args...)> : public DelegateFree<RetType(Args
427427
return std::any_cast<RetType>(m_retVal);
428428
}
429429
catch (const std::bad_any_cast&) {
430-
return RetType{}; // Return a default value if error
430+
return RetType(); // Return a default value if error
431431
}
432432
}
433433

@@ -753,7 +753,7 @@ class DelegateMemberAsyncWait<TClass, RetType(Args...)> : public DelegateMember<
753753
return GetRetVal();
754754
} else {
755755
// Return a default return value
756-
return RetType{};
756+
return RetType();
757757
}
758758
}
759759
}
@@ -830,7 +830,7 @@ class DelegateMemberAsyncWait<TClass, RetType(Args...)> : public DelegateMember<
830830
return std::any_cast<RetType>(m_retVal);
831831
}
832832
catch (const std::bad_any_cast&) {
833-
return RetType{}; // Return a default value if error
833+
return RetType(); // Return a default value if error
834834
}
835835
}
836836

@@ -1075,7 +1075,7 @@ class DelegateFunctionAsyncWait<RetType(Args...)> : public DelegateFunction<RetT
10751075
return GetRetVal();
10761076
} else {
10771077
// Return a default return value
1078-
return RetType{};
1078+
return RetType();
10791079
}
10801080
}
10811081
}
@@ -1152,7 +1152,7 @@ class DelegateFunctionAsyncWait<RetType(Args...)> : public DelegateFunction<RetT
11521152
return std::any_cast<RetType>(m_retVal);
11531153
}
11541154
catch (const std::bad_any_cast&) {
1155-
return RetType{}; // Return a default value if error
1155+
return RetType(); // Return a default value if error
11561156
}
11571157
}
11581158

src/delegate-mq/delegate/UnicastDelegate.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ class UnicastDelegate<RetType(Args...)>
4343
/// @param[in] args The arguments used when invoking the target function
4444
/// @return The target function return value.
4545
RetType operator()(Args... args) {
46-
return (*m_delegate)(args...); // Invoke delegate callback
46+
if (m_delegate)
47+
return (*m_delegate)(args...); // Invoke delegate callback
48+
else
49+
return RetType();
4750
}
4851

4952
/// Invoke the bound target functions.

src/delegate-mq/predef/os/stdlib/Thread.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ using namespace dmq;
1515
//----------------------------------------------------------------------------
1616
// Thread
1717
//----------------------------------------------------------------------------
18-
Thread::Thread(const std::string& threadName) : m_thread(nullptr), THREAD_NAME(threadName)
18+
Thread::Thread(const std::string& threadName) : m_thread(nullptr), m_exit(false), THREAD_NAME(threadName)
1919
{
2020
}
2121

@@ -109,15 +109,25 @@ void Thread::ExitThread()
109109
m_cv.notify_one();
110110
}
111111

112+
m_exit.store(true);
112113
m_thread->join();
113-
m_thread = nullptr;
114+
115+
// Clear the queue if anything added while waiting for join
116+
{
117+
lock_guard<mutex> lock(m_mutex);
118+
m_thread = nullptr;
119+
while (!m_queue.empty())
120+
m_queue.pop();
121+
}
114122
}
115123

116124
//----------------------------------------------------------------------------
117125
// DispatchDelegate
118126
//----------------------------------------------------------------------------
119127
void Thread::DispatchDelegate(std::shared_ptr<dmq::DelegateMsg> msg)
120128
{
129+
if (m_exit.load())
130+
return;
121131
if (m_thread == nullptr)
122132
throw std::invalid_argument("Thread pointer is null");
123133

src/delegate-mq/predef/os/stdlib/Thread.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ class Thread : public dmq::IThread
6262
// Promise and future to synchronize thread start
6363
std::promise<void> m_threadStartPromise;
6464
std::future<void> m_threadStartFuture;
65+
66+
std::atomic<bool> m_exit;
6567
};
6668

6769
#endif

src/delegate-mq/predef/util/Timer.cpp

Lines changed: 64 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -14,121 +14,130 @@ xlist<Timer*> Timer::m_timers;
1414
//------------------------------------------------------------------------------
1515
static bool TimerDisabled (Timer* value)
1616
{
17-
return !(value->Enabled());
17+
return !(value->Enabled());
1818
}
1919

2020
//------------------------------------------------------------------------------
2121
// Constructor
2222
//------------------------------------------------------------------------------
2323
Timer::Timer()
2424
{
25-
const std::lock_guard<std::mutex> lock(m_lock);
26-
m_enabled = false;
25+
const std::lock_guard<std::mutex> lock(m_lock);
26+
m_enabled = false;
2727
}
2828

2929
//------------------------------------------------------------------------------
3030
// Destructor
3131
//------------------------------------------------------------------------------
3232
Timer::~Timer()
3333
{
34-
const std::lock_guard<std::mutex> lock(m_lock);
35-
m_timers.remove(this);
34+
const std::lock_guard<std::mutex> lock(m_lock);
35+
m_timers.remove(this);
3636
}
3737

3838
//------------------------------------------------------------------------------
3939
// Start
4040
//------------------------------------------------------------------------------
41-
void Timer::Start(std::chrono::milliseconds timeout)
41+
void Timer::Start(std::chrono::milliseconds timeout, bool once)
4242
{
43-
if (timeout <= std::chrono::milliseconds(0))
44-
throw std::invalid_argument("Timeout cannot be 0");
43+
if (timeout <= std::chrono::milliseconds(0))
44+
throw std::invalid_argument("Timeout cannot be 0");
4545

46-
const std::lock_guard<std::mutex> lock(m_lock);
46+
const std::lock_guard<std::mutex> lock(m_lock);
4747

48-
m_timeout = timeout;
49-
m_expireTime = GetTime();
50-
m_enabled = true;
48+
m_timeout = timeout;
49+
m_once = once;
50+
m_expireTime = GetTime();
51+
m_enabled = true;
5152

52-
// Remove the existing entry, if any, to prevent duplicates in the list
53-
m_timers.remove(this);
53+
// Remove the existing entry, if any, to prevent duplicates in the list
54+
m_timers.remove(this);
5455

55-
// Add this timer to the list for servicing
56-
m_timers.push_back(this);
56+
// Add this timer to the list for servicing
57+
m_timers.push_back(this);
5758
}
5859

5960
//------------------------------------------------------------------------------
6061
// Stop
6162
//------------------------------------------------------------------------------
6263
void Timer::Stop()
6364
{
64-
const std::lock_guard<std::mutex> lock(m_lock);
65+
const std::lock_guard<std::mutex> lock(m_lock);
6566

66-
m_enabled = false;
67-
m_timerStopped = true;
67+
m_enabled = false;
68+
m_timerStopped = true;
6869
}
6970

7071
//------------------------------------------------------------------------------
7172
// CheckExpired
7273
//------------------------------------------------------------------------------
7374
void Timer::CheckExpired()
7475
{
75-
if (!m_enabled)
76-
return;
76+
if (!m_enabled)
77+
return;
7778

78-
// Has the timer expired?
79+
// Has the timer expired?
7980
if (Difference(m_expireTime, GetTime()) < m_timeout)
8081
return;
8182

82-
// Increment the timer to the next expiration
83-
m_expireTime += m_timeout;
84-
85-
// Is the timer already expired after we incremented above?
86-
if (Difference(m_expireTime, GetTime()) > m_timeout)
87-
{
88-
// The timer has fallen behind so set time expiration further forward.
89-
m_expireTime = GetTime();
90-
}
91-
92-
// Call the client's expired callback function
93-
if (Expired)
94-
Expired();
83+
if (m_once)
84+
{
85+
m_enabled = false;
86+
m_timerStopped = true;
87+
}
88+
else
89+
{
90+
// Increment the timer to the next expiration
91+
m_expireTime += m_timeout;
92+
93+
// Is the timer already expired after we incremented above?
94+
if (Difference(m_expireTime, GetTime()) > m_timeout)
95+
{
96+
// The timer has fallen behind so set time expiration further forward.
97+
m_expireTime = GetTime();
98+
}
99+
}
100+
101+
// Call the client's expired callback function
102+
//if (Expired)
103+
Expired();
95104
}
96105

97106
//------------------------------------------------------------------------------
98107
// Difference
99108
//------------------------------------------------------------------------------
100109
std::chrono::milliseconds Timer::Difference(std::chrono::milliseconds time1, std::chrono::milliseconds time2)
101110
{
102-
return (time2 - time1);
111+
return (time2 - time1);
103112
}
104113

105114
//------------------------------------------------------------------------------
106115
// ProcessTimers
107116
//------------------------------------------------------------------------------
108117
void Timer::ProcessTimers()
109118
{
110-
const std::lock_guard<std::mutex> lock(m_lock);
111-
112-
// Remove disabled timer from the list if stopped
113-
if (m_timerStopped)
114-
{
115-
m_timers.remove_if(TimerDisabled);
116-
m_timerStopped = false;
117-
}
118-
119-
// Iterate through each timer and check for expirations
120-
TimersIterator it;
121-
for (it = m_timers.begin() ; it != m_timers.end(); it++ )
122-
{
123-
if ((*it) != NULL)
124-
(*it)->CheckExpired();
125-
}
119+
const std::lock_guard<std::mutex> lock(m_lock);
120+
121+
// Remove disabled timer from the list if stopped
122+
if (m_timerStopped)
123+
{
124+
m_timers.remove_if(TimerDisabled);
125+
m_timerStopped = false;
126+
}
127+
128+
// Iterate through each timer and check for expirations
129+
TimersIterator it;
130+
for (it = m_timers.begin() ; it != m_timers.end(); it++ )
131+
{
132+
if ((*it) != NULL)
133+
(*it)->CheckExpired();
134+
}
126135
}
127136

128137
std::chrono::milliseconds Timer::GetTime()
129138
{
130-
auto duration = std::chrono::system_clock::now().time_since_epoch();
131-
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration);
132-
return millis;
139+
auto duration = std::chrono::system_clock::now().time_since_epoch();
140+
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration);
141+
return millis;
133142
}
134143

0 commit comments

Comments
 (0)