Skip to content

Commit 88881e9

Browse files
godexsoftkuznetsss
andauthored
chore: TSAN fix async-signal-unsafe (#2824)
Co-authored-by: Sergey Kuznetsov <skuznetsov@ripple.com>
1 parent 94e70e4 commit 88881e9

File tree

7 files changed

+193
-63
lines changed

7 files changed

+193
-63
lines changed

src/app/ClioApplication.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ ClioApplication::ClioApplication(util::config::ClioConfigDefinition const& confi
9191
{
9292
LOG(util::LogService::info()) << "Clio version: " << util::build::getClioFullVersionString();
9393
signalsHandler_.subscribeToStop([this]() { appStopper_.stop(); });
94+
appStopper_.setOnComplete([this]() { signalsHandler_.notifyGracefulShutdownComplete(); });
9495
}
9596

9697
int

src/app/Stopper.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,18 @@ Stopper::~Stopper()
3838
void
3939
Stopper::setOnStop(std::function<void(boost::asio::yield_context)> cb)
4040
{
41-
util::spawn(ctx_, std::move(cb));
41+
util::spawn(ctx_, [this, cb = std::move(cb)](auto yield) {
42+
cb(yield);
43+
44+
if (onCompleteCallback_)
45+
onCompleteCallback_();
46+
});
47+
}
48+
49+
void
50+
Stopper::setOnComplete(std::function<void()> cb)
51+
{
52+
onCompleteCallback_ = std::move(cb);
4253
}
4354

4455
void

src/app/Stopper.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ namespace app {
4343
class Stopper {
4444
boost::asio::io_context ctx_;
4545
std::thread worker_;
46+
std::function<void()> onCompleteCallback_;
4647

4748
public:
4849
/**
@@ -58,6 +59,14 @@ class Stopper {
5859
void
5960
setOnStop(std::function<void(boost::asio::yield_context)> cb);
6061

62+
/**
63+
* @brief Set the callback to be called when graceful shutdown completes.
64+
*
65+
* @param cb The callback to be called when shutdown completes.
66+
*/
67+
void
68+
setOnComplete(std::function<void()> cb);
69+
6170
/**
6271
* @brief Stop the application and run the shutdown tasks.
6372
*/

src/util/SignalsHandler.cpp

Lines changed: 91 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323
#include "util/config/ConfigDefinition.hpp"
2424
#include "util/log/Logger.hpp"
2525

26+
#include <atomic>
2627
#include <chrono>
28+
#include <condition_variable>
2729
#include <csignal>
2830
#include <functional>
29-
#include <optional>
31+
#include <mutex>
32+
#include <thread>
3033
#include <utility>
3134

3235
namespace util {
@@ -50,17 +53,11 @@ class SignalsHandlerStatic {
5053
}
5154

5255
static void
53-
handleSignal(int signal)
56+
handleSignal(int /* signal */)
5457
{
5558
ASSERT(installedHandler != nullptr, "SignalsHandler is not initialized");
56-
installedHandler->stopHandler_(signal);
57-
}
58-
59-
static void
60-
handleSecondSignal(int signal)
61-
{
62-
ASSERT(installedHandler != nullptr, "SignalsHandler is not initialized");
63-
installedHandler->secondSignalHandler_(signal);
59+
installedHandler->signalReceived_ = true;
60+
installedHandler->cv_.notify_one();
6461
}
6562
};
6663

@@ -69,56 +66,109 @@ SignalsHandler* SignalsHandlerStatic::installedHandler = nullptr;
6966
} // namespace impl
7067

7168
SignalsHandler::SignalsHandler(config::ClioConfigDefinition const& config, std::function<void()> forceExitHandler)
72-
: gracefulPeriod_(0)
73-
, context_(1)
74-
, stopHandler_([this, forceExitHandler](int) mutable {
75-
LOG(LogService::info()) << "Got stop signal. Stopping Clio. Graceful period is "
76-
<< std::chrono::duration_cast<std::chrono::milliseconds>(gracefulPeriod_).count()
77-
<< " milliseconds.";
78-
setHandler(impl::SignalsHandlerStatic::handleSecondSignal);
79-
timer_.emplace(context_.scheduleAfter(
80-
gracefulPeriod_, [forceExitHandler = std::move(forceExitHandler)](auto&& stopToken, bool canceled) {
81-
// TODO: Update this after https://github.com/XRPLF/clio/issues/1380
82-
if (not stopToken.isStopRequested() and not canceled) {
83-
LOG(LogService::warn()) << "Force exit at the end of graceful period.";
84-
forceExitHandler();
85-
}
86-
}
87-
));
88-
stopSignal_();
89-
})
90-
, secondSignalHandler_([this, forceExitHandler = std::move(forceExitHandler)](int) {
91-
LOG(LogService::warn()) << "Force exit on second signal.";
92-
forceExitHandler();
93-
cancelTimer();
94-
setHandler();
95-
})
69+
: gracefulPeriod_(util::config::ClioConfigDefinition::toMilliseconds(config.get<float>("graceful_period")))
70+
, forceExitHandler_(std::move(forceExitHandler))
9671
{
9772
impl::SignalsHandlerStatic::registerHandler(*this);
98-
99-
gracefulPeriod_ = util::config::ClioConfigDefinition::toMilliseconds(config.get<float>("graceful_period"));
73+
workerThread_ = std::thread([this]() { runStateMachine(); });
10074
setHandler(impl::SignalsHandlerStatic::handleSignal);
10175
}
10276

10377
SignalsHandler::~SignalsHandler()
10478
{
105-
cancelTimer();
10679
setHandler();
80+
81+
state_ = State::NormalExit;
82+
cv_.notify_one();
83+
84+
if (workerThread_.joinable())
85+
workerThread_.join();
86+
10787
impl::SignalsHandlerStatic::resetHandler(); // This is needed mostly for tests to reset static state
10888
}
10989

11090
void
111-
SignalsHandler::cancelTimer()
91+
SignalsHandler::notifyGracefulShutdownComplete()
11292
{
113-
if (timer_.has_value())
114-
timer_->abort();
93+
if (state_ == State::GracefulShutdown) {
94+
LOG(LogService::info()) << "Graceful shutdown completed successfully.";
95+
state_ = State::NormalExit;
96+
cv_.notify_one();
97+
}
11598
}
11699

117100
void
118101
SignalsHandler::setHandler(void (*handler)(int))
119102
{
120-
for (int const signal : kHANDLED_SIGNALS) {
103+
for (int const signal : kHANDLED_SIGNALS)
121104
std::signal(signal, handler == nullptr ? SIG_DFL : handler);
105+
}
106+
107+
void
108+
SignalsHandler::runStateMachine()
109+
{
110+
while (state_ != State::NormalExit) {
111+
auto currentState = state_.load();
112+
113+
switch (currentState) {
114+
case State::WaitingForSignal: {
115+
{
116+
std::unique_lock<std::mutex> lock(mutex_);
117+
cv_.wait(lock, [this]() { return signalReceived_ or state_ == State::NormalExit; });
118+
}
119+
120+
if (state_ == State::NormalExit)
121+
return;
122+
123+
LOG(
124+
LogService::info()
125+
) << "Got stop signal. Stopping Clio. Graceful period is "
126+
<< std::chrono::duration_cast<std::chrono::milliseconds>(gracefulPeriod_).count() << " milliseconds.";
127+
128+
state_ = State::GracefulShutdown;
129+
signalReceived_ = false;
130+
131+
stopSignal_();
132+
break;
133+
}
134+
135+
case State::GracefulShutdown: {
136+
bool waitResult = false;
137+
{
138+
std::unique_lock<std::mutex> lock(mutex_);
139+
140+
// Wait for either:
141+
// 1. Graceful period to elapse (timeout)
142+
// 2. Another signal (signalReceived_)
143+
// 3. Graceful shutdown completion (state changes to NormalExit)
144+
waitResult = cv_.wait_for(lock, gracefulPeriod_, [this]() {
145+
return signalReceived_ or state_ == State::NormalExit;
146+
});
147+
}
148+
149+
if (state_ == State::NormalExit)
150+
break;
151+
152+
if (signalReceived_) {
153+
LOG(LogService::warn()) << "Force exit on second signal.";
154+
state_ = State::ForceExit;
155+
signalReceived_ = false;
156+
} else if (not waitResult) {
157+
LOG(LogService::warn()) << "Force exit at the end of graceful period.";
158+
state_ = State::ForceExit;
159+
}
160+
break;
161+
}
162+
163+
case State::ForceExit: {
164+
forceExitHandler_();
165+
state_ = State::NormalExit;
166+
break;
167+
}
168+
169+
case State::NormalExit:
170+
return;
171+
}
122172
}
123173
}
124174

src/util/SignalsHandler.hpp

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,20 @@
1919

2020
#pragma once
2121

22-
#include "util/async/context/BasicExecutionContext.hpp"
2322
#include "util/config/ConfigDefinition.hpp"
2423
#include "util/log/Logger.hpp"
2524

26-
#include <boost/asio/executor_work_guard.hpp>
27-
#include <boost/asio/io_context.hpp>
28-
#include <boost/asio/steady_timer.hpp>
2925
#include <boost/signals2/signal.hpp>
30-
#include <boost/signals2/variadic_signal.hpp>
3126

27+
#include <atomic>
3228
#include <chrono>
3329
#include <concepts>
30+
#include <condition_variable>
3431
#include <csignal>
3532
#include <cstdlib>
3633
#include <functional>
37-
#include <optional>
34+
#include <mutex>
35+
#include <thread>
3836

3937
namespace util {
4038
namespace impl {
@@ -48,13 +46,22 @@ class SignalsHandlerStatic;
4846
* @note There could be only one instance of this class.
4947
*/
5048
class SignalsHandler {
49+
/**
50+
* @brief States of the signal handler state machine.
51+
*/
52+
enum class State { WaitingForSignal, GracefulShutdown, ForceExit, NormalExit };
53+
5154
std::chrono::steady_clock::duration gracefulPeriod_;
52-
async::PoolExecutionContext context_;
53-
std::optional<async::PoolExecutionContext::ScheduledOperation<void>> timer_;
55+
std::function<void()> forceExitHandler_;
5456

5557
boost::signals2::signal<void()> stopSignal_;
56-
std::function<void(int)> stopHandler_;
57-
std::function<void(int)> secondSignalHandler_;
58+
59+
std::atomic<bool> signalReceived_{false};
60+
std::atomic<State> state_{State::WaitingForSignal};
61+
62+
std::mutex mutex_;
63+
std::condition_variable cv_;
64+
std::thread workerThread_;
5865

5966
friend class impl::SignalsHandlerStatic;
6067

@@ -101,15 +108,16 @@ class SignalsHandler {
101108
stopSignal_.connect(static_cast<int>(priority), std::forward<SomeCallback>(callback));
102109
}
103110

104-
static constexpr auto kHANDLED_SIGNALS = {SIGINT, SIGTERM};
105-
106-
private:
107111
/**
108-
* @brief Cancel scheduled force exit if any.
112+
* @brief Notify the signal handler that graceful shutdown has completed.
113+
* This allows the handler to transition to NormalExit state.
109114
*/
110115
void
111-
cancelTimer();
116+
notifyGracefulShutdownComplete();
112117

118+
static constexpr auto kHANDLED_SIGNALS = {SIGINT, SIGTERM};
119+
120+
private:
113121
/**
114122
* @brief Set signal handler for handled signals.
115123
*
@@ -118,6 +126,12 @@ class SignalsHandler {
118126
static void
119127
setHandler(void (*handler)(int) = nullptr);
120128

129+
/**
130+
* @brief Run the state machine loop in a worker thread.
131+
*/
132+
void
133+
runStateMachine();
134+
121135
static constexpr auto kDEFAULT_FORCE_EXIT_HANDLER = []() { std::exit(EXIT_FAILURE); };
122136
};
123137

tests/unit/app/StopperTests.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ struct StopperTest : virtual public ::testing::Test {
4040
protected:
4141
// Order here is important, stopper_ should die before mockCallback_, otherwise UB
4242
testing::StrictMock<testing::MockFunction<void(boost::asio::yield_context)>> mockCallback_;
43+
testing::StrictMock<testing::MockFunction<void()>> mockCompleteCallback_;
4344
Stopper stopper_;
4445
};
4546

@@ -60,6 +61,22 @@ TEST_F(StopperTest, stopCalledMultipleTimes)
6061
stopper_.stop();
6162
}
6263

64+
TEST_F(StopperTest, stopCallsCompletionCallback)
65+
{
66+
stopper_.setOnStop(mockCallback_.AsStdFunction());
67+
stopper_.setOnComplete(mockCompleteCallback_.AsStdFunction());
68+
EXPECT_CALL(mockCallback_, Call);
69+
EXPECT_CALL(mockCompleteCallback_, Call);
70+
stopper_.stop();
71+
}
72+
73+
TEST_F(StopperTest, stopWithoutCompletionCallback)
74+
{
75+
stopper_.setOnStop(mockCallback_.AsStdFunction());
76+
EXPECT_CALL(mockCallback_, Call);
77+
stopper_.stop();
78+
}
79+
6380
struct StopperMakeCallbackTest : util::prometheus::WithPrometheus, SyncAsioContextTest {
6481
struct ServerMock : web::ServerTag {
6582
MOCK_METHOD(void, stop, (boost::asio::yield_context), ());

0 commit comments

Comments
 (0)