Skip to content

Commit bae7d25

Browse files
authored
Socket automatic reconnections in dedicated thread (nasa#4136)
* Initial addition of dedicated reconnect task in SocketComponentHelper * Updates to fix UT errors, address FIXMEs, update comments * Additional FIXMEs and doc updates * Spelling fix * Format * Format and some delay updates * Address second request FIXMEs (remove) * Unconditional stop & join in UDP tester cleanup * Address FIXMEs and update Task to init singleton in a thread and address sanitizer safe manner * Spelling * Address PR review comments
1 parent 40142d7 commit bae7d25

5 files changed

Lines changed: 256 additions & 39 deletions

File tree

Drv/Ip/SocketComponentHelper.cpp

Lines changed: 162 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,22 @@ SocketComponentHelper::~SocketComponentHelper() {}
2424
void SocketComponentHelper::start(const Fw::StringBase& name,
2525
const FwTaskPriorityType priority,
2626
const Os::Task::ParamType stack,
27-
const Os::Task::ParamType cpuAffinity) {
27+
const Os::Task::ParamType cpuAffinity,
28+
const FwTaskPriorityType priorityReconnect,
29+
const Os::Task::ParamType stackReconnect,
30+
const Os::Task::ParamType cpuAffinityReconnect) {
31+
// Reconnect Thread
32+
FW_ASSERT(m_reconnectTask.getState() ==
33+
Os::Task::State::NOT_STARTED); // It is a coding error to start this task multiple times
34+
this->m_reconnectStop = false;
35+
Fw::String reconnectName;
36+
reconnectName.format("%s_reconnect", name.toChar());
37+
Os::Task::Arguments reconnectArguments(reconnectName, SocketComponentHelper::reconnectTask, this, priorityReconnect,
38+
stackReconnect, cpuAffinityReconnect);
39+
Os::Task::Status reconnectStat = m_reconnectTask.start(reconnectArguments);
40+
FW_ASSERT(Os::Task::OP_OK == reconnectStat, static_cast<FwAssertArgType>(reconnectStat));
41+
42+
// Read Thread
2843
FW_ASSERT(m_task.getState() ==
2944
Os::Task::State::NOT_STARTED); // It is a coding error to start this task multiple times
3045
this->m_stop = false;
@@ -80,18 +95,19 @@ void SocketComponentHelper::setAutomaticOpen(bool auto_open) {
8095
this->m_reopen = auto_open;
8196
}
8297

98+
bool SocketComponentHelper::getAutomaticOpen() {
99+
Os::ScopeLock scopedLock(this->m_lock);
100+
return this->m_reopen;
101+
}
102+
83103
SocketIpStatus SocketComponentHelper::reopen() {
84104
SocketIpStatus status = SOCK_SUCCESS;
85105
if (not this->isOpened()) {
86106
// Check for auto-open before attempting to reopen
87-
bool reopen = false;
88-
{
89-
Os::ScopeLock scopedLock(this->m_lock);
90-
reopen = this->m_reopen;
91-
}
92-
// Open a network connection if it has not already been open
107+
bool reopen = this->getAutomaticOpen();
93108
if (not reopen) {
94109
status = SOCK_AUTO_CONNECT_DISABLED;
110+
// Open a network connection if it has not already been open
95111
} else {
96112
status = this->open();
97113
if (status == SocketIpStatus::SOCK_ANOTHER_THREAD_OPENING) {
@@ -109,15 +125,16 @@ SocketIpStatus SocketComponentHelper::send(const U8* const data, const FwSizeTyp
109125
this->m_lock.unlock();
110126
// Prevent transmission before connection, or after a disconnect
111127
if (descriptor.fd == -1) {
112-
status = this->reopen();
113-
// if reopen wasn't successful, pass the that up to the caller
114-
if (status != SOCK_SUCCESS) {
115-
return status;
128+
this->requestReconnect();
129+
SocketIpStatus reconnectStat = this->waitForReconnect();
130+
if (reconnectStat == SOCK_SUCCESS) {
131+
// Refresh local copy after reopen
132+
this->m_lock.lock();
133+
descriptor = this->m_descriptor;
134+
this->m_lock.unlock();
135+
} else {
136+
return reconnectStat;
116137
}
117-
// Refresh local copy after reopen
118-
this->m_lock.lock();
119-
descriptor = this->m_descriptor;
120-
this->m_lock.unlock();
121138
}
122139
status = this->getSocketHandler().send(descriptor, data, size);
123140
if (status == SOCK_DISCONNECTED) {
@@ -138,8 +155,15 @@ void SocketComponentHelper::close() {
138155
this->m_open = OpenState::NOT_OPEN;
139156
}
140157

158+
/* Read Thread */
159+
141160
Os::Task::Status SocketComponentHelper::join() {
142-
return m_task.join();
161+
Os::Task::Status stat = m_task.join();
162+
Os::Task::Status reconnectStat = this->joinReconnect();
163+
if (stat == Os::Task::Status::OP_OK) {
164+
return reconnectStat;
165+
}
166+
return stat;
143167
}
144168

145169
void SocketComponentHelper::stop() {
@@ -148,6 +172,7 @@ void SocketComponentHelper::stop() {
148172
Os::ScopeLock scopeLock(m_lock);
149173
this->m_stop = true;
150174
}
175+
this->stopReconnect();
151176
this->shutdown(); // Break out of any receives and fully shutdown
152177
}
153178

@@ -178,17 +203,12 @@ void SocketComponentHelper::readLoop() {
178203
do {
179204
// Prevent transmission before connection, or after a disconnect
180205
if ((not this->isOpened()) and this->running()) {
181-
status = this->reopen();
206+
this->requestReconnect();
207+
status = this->waitForReconnect();
182208
// When reopen is disabled, just break as this is a exit condition for the loop
183209
if (status == SOCK_AUTO_CONNECT_DISABLED) {
184210
break;
185211
}
186-
// If the reconnection failed in any other way, warn, wait, and retry
187-
else if (status != SOCK_SUCCESS) {
188-
Fw::Logger::log("[WARNING] Failed to open port with status %d and errno %d\n", status, errno);
189-
(void)Os::Task::delay(SOCKET_RETRY_INTERVAL);
190-
continue;
191-
}
192212
}
193213
// If the network connection is open, read from it
194214
if (this->isOpened() and this->running()) {
@@ -221,4 +241,123 @@ void SocketComponentHelper::readTask(void* pointer) {
221241
SocketComponentHelper* self = reinterpret_cast<SocketComponentHelper*>(pointer);
222242
self->readLoop();
223243
}
244+
245+
/* Reconnect Thread */
246+
247+
Os::Task::Status SocketComponentHelper::joinReconnect() {
248+
return m_reconnectTask.join();
249+
}
250+
251+
void SocketComponentHelper::stopReconnect() {
252+
Os::ScopeLock scopeLock(this->m_reconnectLock);
253+
this->m_reconnectState = ReconnectState::NOT_RECONNECTING;
254+
this->m_reconnectStop = true;
255+
}
256+
257+
bool SocketComponentHelper::runningReconnect() {
258+
Os::ScopeLock scopedLock(this->m_reconnectLock);
259+
bool running = not this->m_reconnectStop;
260+
return running;
261+
}
262+
263+
void SocketComponentHelper::reconnectLoop() {
264+
SocketIpStatus status = SOCK_SUCCESS;
265+
while (this->runningReconnect()) {
266+
// Check if we need to reconnect
267+
bool reconnect = false;
268+
{
269+
Os::ScopeLock scopedLock(this->m_reconnectLock);
270+
if (this->m_reconnectState == ReconnectState::REQUEST_RECONNECT) {
271+
this->m_reconnectState = ReconnectState::RECONNECT_IN_PROGRESS;
272+
reconnect = true;
273+
274+
}
275+
// If we were already in or are now in RECONNECT_IN_PROGRESS we
276+
// need to try to reconnect, again
277+
else if (this->m_reconnectState == ReconnectState::RECONNECT_IN_PROGRESS) {
278+
reconnect = true;
279+
}
280+
}
281+
282+
if (reconnect) {
283+
status = this->reopen();
284+
285+
// Reopen Case 1: Auto Connect is disabled, so no longer
286+
// try to reconnect
287+
if (status == SOCK_AUTO_CONNECT_DISABLED) {
288+
Os::ScopeLock scopedLock(this->m_reconnectLock);
289+
this->m_reconnectState = ReconnectState::NOT_RECONNECTING;
290+
}
291+
// Reopen Case 2: Success, so no longer
292+
// try to reconnect
293+
else if (status == SOCK_SUCCESS) {
294+
Os::ScopeLock scopedLock(this->m_reconnectLock);
295+
this->m_reconnectState = ReconnectState::NOT_RECONNECTING;
296+
}
297+
// Reopen Case 3: Keep trying to reconnect - NO reconnect
298+
// state change
299+
else {
300+
Fw::Logger::log("[WARNING] Failed to open port with status %d and errno %d\n", status, errno);
301+
(void)Os::Task::delay(SOCKET_RETRY_INTERVAL);
302+
}
303+
} else {
304+
// After a brief delay, we will loop again
305+
(void)Os::Task::delay(this->m_reconnectCheckInterval);
306+
}
307+
}
308+
}
309+
310+
void SocketComponentHelper::reconnectTask(void* pointer) {
311+
FW_ASSERT(pointer);
312+
SocketComponentHelper* self = reinterpret_cast<SocketComponentHelper*>(pointer);
313+
self->reconnectLoop();
314+
}
315+
316+
void SocketComponentHelper::requestReconnect() {
317+
Os::ScopeLock scopedLock(this->m_reconnectLock);
318+
if (m_reconnectState == ReconnectState::NOT_RECONNECTING) {
319+
m_reconnectState = ReconnectState::REQUEST_RECONNECT;
320+
}
321+
return;
322+
}
323+
324+
SocketIpStatus SocketComponentHelper::waitForReconnect(Fw::TimeInterval timeout) {
325+
// Do not attempt to reconnect if auto reconnect config flag is disabled
326+
if (!this->getAutomaticOpen()) {
327+
return SOCK_AUTO_CONNECT_DISABLED;
328+
}
329+
330+
Fw::TimeInterval elapsed = Fw::TimeInterval(0, 0);
331+
332+
while (elapsed < timeout) {
333+
// If the reconnect thread is NOT reconnecting, we are done waiting
334+
// If we are no longer running the reconnect thread, we are done waiting
335+
{
336+
Os::ScopeLock scopedLock(this->m_reconnectLock);
337+
if (this->m_reconnectState == ReconnectState::NOT_RECONNECTING) {
338+
break;
339+
}
340+
if (this->m_reconnectStop) {
341+
break;
342+
}
343+
}
344+
// Wait a bit before checking again
345+
(void)Os::Task::delay(this->m_reconnectWaitInterval);
346+
elapsed.add(this->m_reconnectWaitInterval.getSeconds(), this->m_reconnectWaitInterval.getUSeconds());
347+
}
348+
349+
// If we have completed our loop, check if we are connected or if
350+
// auto connect was disabled during our wait
351+
if (this->isOpened()) {
352+
return SOCK_SUCCESS;
353+
}
354+
355+
// Check one more time if auto reconnect config flag got disabled
356+
if (!this->getAutomaticOpen()) {
357+
return SOCK_AUTO_CONNECT_DISABLED;
358+
}
359+
360+
return SOCK_DISCONNECTED; // Indicates failure of this attempt, another reopen needed
361+
}
362+
224363
} // namespace Drv

0 commit comments

Comments
 (0)