Skip to content

Commit b62cfe9

Browse files
authored
feat: Graceful shutdown with old web server (#2786)
- Stop accepting connections during graceful shutdown in the old web server - Stop all the services before Clio exits - Move cache saving into stop callback
1 parent 56f074e commit b62cfe9

17 files changed

+414
-47
lines changed

src/app/ClioApplication.cpp

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "data/AmendmentCenter.hpp"
2626
#include "data/BackendFactory.hpp"
2727
#include "data/LedgerCache.hpp"
28+
#include "data/LedgerCacheSaver.hpp"
2829
#include "etl/ETLService.hpp"
2930
#include "etl/LoadBalancer.hpp"
3031
#include "etl/NetworkValidatedLedgers.hpp"
@@ -98,36 +99,23 @@ ClioApplication::run(bool const useNgWebServer)
9899
auto const threads = config_.get<uint16_t>("io_threads");
99100
LOG(util::LogService::info()) << "Number of io threads = " << threads;
100101

101-
// IO context to handle all incoming requests, as well as other things.
102-
// This is not the only io context in the application.
103-
boost::asio::io_context ioc{threads};
104-
105102
// Similarly we need a context to run ETL on
106103
// In the future we can remove the raw ioc and use ctx instead
104+
// This context should be above ioc because its reference is getting into tasks inside ioc
107105
util::async::CoroExecutionContext ctx{threads};
108106

107+
// IO context to handle all incoming requests, as well as other things.
108+
// This is not the only io context in the application.
109+
boost::asio::io_context ioc{threads};
110+
109111
// Rate limiter, to prevent abuse
110112
auto whitelistHandler = web::dosguard::WhitelistHandler{config_};
111113
auto const dosguardWeights = web::dosguard::Weights::make(config_);
112114
auto dosGuard = web::dosguard::DOSGuard{config_, whitelistHandler, dosguardWeights};
113115
auto sweepHandler = web::dosguard::IntervalSweepHandler{config_, ioc, dosGuard};
114116

115117
auto cache = data::LedgerCache{};
116-
appStopper_.setOnStop([&cache, this](auto&&) {
117-
// TODO(kuznetsss): move this into Stopper::makeOnStopCallback()
118-
auto const cacheFilePath = config_.maybeValue<std::string>("cache.file.path");
119-
if (not cacheFilePath.has_value()) {
120-
return;
121-
}
122-
123-
LOG(util::LogService::info()) << "Saving ledger cache to " << *cacheFilePath;
124-
if (auto const [success, duration_ms] = util::timed([&]() { return cache.saveToFile(*cacheFilePath); });
125-
success.has_value()) {
126-
LOG(util::LogService::info()) << "Successfully saved ledger cache in " << duration_ms << " ms";
127-
} else {
128-
LOG(util::LogService::error()) << "Error saving LedgerCache to file";
129-
}
130-
});
118+
auto cacheSaver = data::LedgerCacheSaver{config_, cache};
131119

132120
// Interface to the database
133121
auto backend = data::makeBackend(config_, cache);
@@ -208,7 +196,7 @@ ClioApplication::run(bool const useNgWebServer)
208196
}
209197

210198
appStopper_.setOnStop(
211-
Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *backend, ioc)
199+
Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *backend, cacheSaver, ioc)
212200
);
213201

214202
// Blocks until stopped.
@@ -223,6 +211,9 @@ ClioApplication::run(bool const useNgWebServer)
223211
auto handler = std::make_shared<web::RPCServerHandler<RPCEngineType>>(config_, backend, rpcEngine, etl, dosGuard);
224212

225213
auto const httpServer = web::makeHttpServer(config_, ioc, dosGuard, handler, cache);
214+
appStopper_.setOnStop(
215+
Stopper::makeOnStopCallback(*httpServer, *balancer, *etl, *subscriptions, *backend, cacheSaver, ioc)
216+
);
226217

227218
// Blocks until stopped.
228219
// When stopped, shared_ptrs fall out of scope

src/app/Stopper.hpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@
2020
#pragma once
2121

2222
#include "data/BackendInterface.hpp"
23+
#include "data/LedgerCacheSaver.hpp"
2324
#include "etl/ETLServiceInterface.hpp"
2425
#include "etl/LoadBalancerInterface.hpp"
2526
#include "feed/SubscriptionManagerInterface.hpp"
2627
#include "util/CoroutineGroup.hpp"
2728
#include "util/log/Logger.hpp"
28-
#include "web/ng/Server.hpp"
29+
#include "web/interface/Concepts.hpp"
2930

3031
#include <boost/asio/executor_work_guard.hpp>
3132
#include <boost/asio/io_context.hpp>
@@ -71,21 +72,25 @@ class Stopper {
7172
* @param etl The ETL service to stop.
7273
* @param subscriptions The subscription manager to stop.
7374
* @param backend The backend to stop.
75+
* @param cacheSaver The ledger cache saver
7476
* @param ioc The io_context to stop.
7577
* @return The callback to be called on application stop.
7678
*/
77-
template <web::ng::SomeServer ServerType>
79+
template <web::SomeServer ServerType, data::SomeLedgerCacheSaver LedgerCacheSaverType>
7880
static std::function<void(boost::asio::yield_context)>
7981
makeOnStopCallback(
8082
ServerType& server,
8183
etl::LoadBalancerInterface& balancer,
8284
etl::ETLServiceInterface& etl,
8385
feed::SubscriptionManagerInterface& subscriptions,
8486
data::BackendInterface& backend,
87+
LedgerCacheSaverType& cacheSaver,
8588
boost::asio::io_context& ioc
8689
)
8790
{
8891
return [&](boost::asio::yield_context yield) {
92+
cacheSaver.save();
93+
8994
util::CoroutineGroup coroutineGroup{yield};
9095
coroutineGroup.spawn(yield, [&server](auto innerYield) {
9196
server.stop(innerYield);
@@ -106,6 +111,8 @@ class Stopper {
106111
backend.waitForWritesToFinish();
107112
LOG(util::LogService::info()) << "Backend writes finished";
108113

114+
cacheSaver.waitToFinish();
115+
109116
ioc.stop();
110117
LOG(util::LogService::info()) << "io_context stopped";
111118

src/data/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ target_sources(
55
BackendCounters.cpp
66
BackendInterface.cpp
77
LedgerCache.cpp
8+
LedgerCacheSaver.cpp
89
LedgerHeaderCache.cpp
910
cassandra/impl/Future.cpp
1011
cassandra/impl/Cluster.cpp

src/data/LedgerCache.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ LedgerCache::saveToFile(std::string const& path) const
265265
}
266266

267267
impl::LedgerCacheFile file{path};
268-
std::unique_lock const lock{mtx_};
268+
std::shared_lock const lock{mtx_};
269269
impl::LedgerCacheFile::DataView const data{.latestSeq = latestSeq_, .map = map_, .deleted = deleted_};
270270
return file.write(data);
271271
}

src/data/LedgerCache.hpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -145,15 +145,8 @@ class LedgerCache : public LedgerCacheInterface {
145145
void
146146
waitUntilCacheContainsSeq(uint32_t seq) override;
147147

148-
/**
149-
* @brief Save the cache to file
150-
* @note This operation takes about 7 seconds and it keeps mtx_ exclusively locked
151-
*
152-
* @param path The file path to save the cache to
153-
* @return An error as a string if any
154-
*/
155148
std::expected<void, std::string>
156-
saveToFile(std::string const& path) const;
149+
saveToFile(std::string const& path) const override;
157150

158151
std::expected<void, std::string>
159152
loadFromFile(std::string const& path, uint32_t minLatestSequence) override;

src/data/LedgerCacheInterface.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,16 @@ class LedgerCacheInterface {
171171
virtual void
172172
waitUntilCacheContainsSeq(uint32_t seq) = 0;
173173

174+
/**
175+
* @brief Save the cache to file
176+
* @note This operation takes about 7 seconds and it keeps a shared lock of mtx_
177+
*
178+
* @param path The file path to save the cache to
179+
* @return An error as a string if any
180+
*/
181+
[[nodiscard]] virtual std::expected<void, std::string>
182+
saveToFile(std::string const& path) const = 0;
183+
174184
/**
175185
* @brief Load the cache from file
176186
* @note This operation takes about 7 seconds and it keeps mtx_ exclusively locked

src/data/LedgerCacheSaver.cpp

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
//------------------------------------------------------------------------------
2+
/*
3+
This file is part of clio: https://github.com/XRPLF/clio
4+
Copyright (c) 2025, the clio developers.
5+
6+
Permission to use, copy, modify, and distribute this software for any
7+
purpose with or without fee is hereby granted, provided that the above
8+
copyright notice and this permission notice appear in all copies.
9+
10+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11+
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12+
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13+
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14+
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15+
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16+
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17+
*/
18+
//==============================================================================
19+
20+
#include "data/LedgerCacheSaver.hpp"
21+
22+
#include "data/LedgerCacheInterface.hpp"
23+
#include "util/Assert.hpp"
24+
#include "util/Profiler.hpp"
25+
#include "util/log/Logger.hpp"
26+
27+
#include <string>
28+
#include <thread>
29+
30+
namespace data {
31+
32+
LedgerCacheSaver::LedgerCacheSaver(util::config::ClioConfigDefinition const& config, LedgerCacheInterface const& cache)
33+
: cacheFilePath_(config.maybeValue<std::string>("cache.file.path")), cache_(cache)
34+
{
35+
}
36+
37+
LedgerCacheSaver::~LedgerCacheSaver()
38+
{
39+
waitToFinish();
40+
}
41+
42+
void
43+
LedgerCacheSaver::save()
44+
{
45+
ASSERT(not savingThread_.has_value(), "Multiple save() calls are not allowed");
46+
savingThread_ = std::thread([this]() {
47+
if (not cacheFilePath_.has_value()) {
48+
return;
49+
}
50+
51+
LOG(util::LogService::info()) << "Saving ledger cache to " << *cacheFilePath_;
52+
if (auto const [success, durationMs] = util::timed([&]() { return cache_.get().saveToFile(*cacheFilePath_); });
53+
success.has_value()) {
54+
LOG(util::LogService::info()) << "Successfully saved ledger cache in " << durationMs << " ms";
55+
} else {
56+
LOG(util::LogService::error()) << "Error saving LedgerCache to file";
57+
}
58+
});
59+
}
60+
61+
void
62+
LedgerCacheSaver::waitToFinish()
63+
{
64+
if (savingThread_.has_value() and savingThread_->joinable()) {
65+
savingThread_->join();
66+
}
67+
savingThread_.reset();
68+
}
69+
70+
} // namespace data

src/data/LedgerCacheSaver.hpp

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
//------------------------------------------------------------------------------
2+
/*
3+
This file is part of clio: https://github.com/XRPLF/clio
4+
Copyright (c) 2025, the clio developers.
5+
6+
Permission to use, copy, modify, and distribute this software for any
7+
purpose with or without fee is hereby granted, provided that the above
8+
copyright notice and this permission notice appear in all copies.
9+
10+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11+
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12+
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13+
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14+
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15+
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16+
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17+
*/
18+
//==============================================================================
19+
20+
#pragma once
21+
22+
#include "data/LedgerCacheInterface.hpp"
23+
#include "util/config/ConfigDefinition.hpp"
24+
25+
#include <concepts>
26+
#include <functional>
27+
#include <optional>
28+
#include <string>
29+
#include <thread>
30+
31+
namespace data {
32+
33+
/**
34+
* @brief A concept for a class that can save ledger cache asynchronously.
35+
*
36+
* This concept defines the interface requirements for any type that manages
37+
* asynchronous saving of ledger cache to persistent storage.
38+
*/
39+
template <typename T>
40+
concept SomeLedgerCacheSaver = requires(T a) {
41+
{ a.save() } -> std::same_as<void>;
42+
{ a.waitToFinish() } -> std::same_as<void>;
43+
};
44+
45+
/**
46+
* @brief Manages asynchronous saving of ledger cache to a file.
47+
*
48+
* This class provides functionality to save the ledger cache to a file in a separate thread,
49+
* allowing the main application to continue without blocking. The file path is configured
50+
* through the application's configuration system.
51+
*/
52+
class LedgerCacheSaver {
53+
std::optional<std::string> cacheFilePath_;
54+
std::reference_wrapper<LedgerCacheInterface const> cache_;
55+
std::optional<std::thread> savingThread_;
56+
57+
public:
58+
/**
59+
* @brief Constructs a LedgerCacheSaver instance.
60+
*
61+
* @param config The configuration object containing the cache file path setting
62+
* @param cache Reference to the ledger cache interface to be saved
63+
*/
64+
LedgerCacheSaver(util::config::ClioConfigDefinition const& config, LedgerCacheInterface const& cache);
65+
66+
/**
67+
* @brief Destructor that ensures the saving thread is properly joined.
68+
*
69+
* Waits for any ongoing save operation to complete before destruction.
70+
*/
71+
~LedgerCacheSaver();
72+
73+
/**
74+
* @brief Initiates an asynchronous save operation of the ledger cache.
75+
*
76+
* Spawns a new thread that saves the ledger cache to the configured file path.
77+
* If no file path is configured, the operation is skipped. Logs the progress
78+
* and result of the save operation.
79+
*/
80+
void
81+
save();
82+
83+
/**
84+
* @brief Waits for the saving thread to complete.
85+
*
86+
* Blocks until the saving operation finishes if a thread is currently active.
87+
* Safe to call multiple times or when no save operation is in progress.
88+
*/
89+
void
90+
waitToFinish();
91+
};
92+
93+
} // namespace data

src/util/StopHelper.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
#include "util/StopHelper.hpp"
2121

22+
#include "util/Spawn.hpp"
23+
2224
#include <boost/asio/spawn.hpp>
2325
#include <boost/asio/steady_timer.hpp>
2426

@@ -37,7 +39,7 @@ void
3739
StopHelper::asyncWaitForStop(boost::asio::yield_context yield)
3840
{
3941
boost::asio::steady_timer timer{yield.get_executor(), std::chrono::steady_clock::duration::max()};
40-
onStopReady_.connect([&timer]() { timer.cancel(); });
42+
onStopReady_.connect([&]() { util::spawn(yield, [&timer](auto&&) { timer.cancel(); }); });
4143
boost::system::error_code error;
4244
if (!*stopped_)
4345
timer.async_wait(yield[error]);

src/util/StopHelper.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,16 @@ class StopHelper {
3636
std::unique_ptr<std::atomic_bool> stopped_ = std::make_unique<std::atomic_bool>(false);
3737

3838
public:
39+
StopHelper() = default;
40+
~StopHelper() = default;
41+
42+
StopHelper(StopHelper&&) = delete;
43+
StopHelper&
44+
operator=(StopHelper&&) = delete;
45+
StopHelper(StopHelper const&) = delete;
46+
StopHelper&
47+
operator=(StopHelper const&) = delete;
48+
3949
/**
4050
* @brief Notify that the class is ready to stop.
4151
*/

0 commit comments

Comments
 (0)