Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
ad8c588
Fix some bugs
kuznetsss Nov 19, 2025
19b4550
Merge branch 'develop' into 1974_Choose_writer_from_communication
kuznetsss Nov 20, 2025
db86ff9
Change flags in etl SystemState
kuznetsss Nov 20, 2025
1b00f28
Add db role to ClioNode
kuznetsss Nov 21, 2025
a32177a
Add WriterState
kuznetsss Nov 24, 2025
49c3eb4
Refactored ClusterCommunicationService
kuznetsss Nov 25, 2025
a68a874
Fix docs
kuznetsss Nov 25, 2025
ff7cc6a
Add metrics subservice
kuznetsss Nov 25, 2025
0e514d7
Move implementation to cpp and add docs
kuznetsss Nov 26, 2025
e84cf0a
Add WriterDecider
kuznetsss Nov 26, 2025
6181abc
Create cpp file and add docs
kuznetsss Nov 26, 2025
f855d8d
Add tests for ClioNode
kuznetsss Nov 26, 2025
e8409a3
WIP
kuznetsss Nov 26, 2025
8fa1cb9
Add tests for RepeatedTask
kuznetsss Nov 27, 2025
60e3ce0
Add Backend tests
kuznetsss Dec 2, 2025
55ba0ab
Add tests for Metrics
kuznetsss Dec 2, 2025
4697b1d
Add tests for WriterDecider
kuznetsss Dec 3, 2025
5758575
Add tests for ClusterCommunicationService
kuznetsss Dec 3, 2025
4818704
Merge branch 'develop' into 1974_Choose_writer_from_communication
kuznetsss Dec 3, 2025
da5008b
Fix typo in variable name
kuznetsss Dec 3, 2025
e16efbf
Add clusterCommunicationService to graceful shutdown
kuznetsss Dec 4, 2025
fde125f
Merge branch 'develop' into 1974_Choose_writer_from_communication
kuznetsss Dec 10, 2025
847ed98
Add writing command to etl::SystemState
kuznetsss Jan 8, 2026
fd01dbb
Improve channels
kuznetsss Jan 8, 2026
c9cdcd0
Add async framework support in Channel
kuznetsss Jan 8, 2026
d71c466
Apply bug fixes
kuznetsss Jan 8, 2026
4573575
Add tests
kuznetsss Jan 9, 2026
4599a70
Run pre-commit
kuznetsss Jan 9, 2026
baddeff
Merge and fix build
kuznetsss Jan 13, 2026
a64c8c4
Add fallback writer role
kuznetsss Jan 13, 2026
584417a
Fix tests and add new
kuznetsss Jan 13, 2026
0737efc
Run pre-commit
kuznetsss Jan 13, 2026
22184b1
Merge branch 'develop' into Add_etl_system_state_command
kuznetsss Jan 15, 2026
bae896f
Run pre-commit
kuznetsss Jan 16, 2026
d4236b1
Fix flaky test
kuznetsss Jan 16, 2026
39d4b94
Run pre-commit
kuznetsss Jan 16, 2026
66615bc
Fix review issues
kuznetsss Jan 14, 2026
27d1eea
Merge branch 'develop' into Add_etl_system_state_command
kuznetsss Jan 14, 2026
207a281
Add Loading state
kuznetsss Jan 14, 2026
5adb3ef
Merge branch 'develop' into Add_etl_system_state_command
kuznetsss Jan 14, 2026
11e1ce2
Add syncing cache with DB
kuznetsss Jan 14, 2026
5e562cf
Fix ETLService tests
kuznetsss Jan 15, 2026
57d8863
Fix other tests
kuznetsss Jan 16, 2026
edc1dc9
Run pre-commit
kuznetsss Jan 16, 2026
8b02c3f
Fix hanging test
kuznetsss Jan 16, 2026
18f2fd9
More ClusterBackend tests for better coverage
kuznetsss Jan 16, 2026
cf9c29c
Add SystemState tests
kuznetsss Jan 16, 2026
8ccd388
Add WriterState test
kuznetsss Jan 16, 2026
7ed52b5
Merge branch 'develop' into 1974_Choose_writer_from_communication
kuznetsss Jan 20, 2026
3ad9d78
Fix review comments
kuznetsss Jan 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions src/app/ClioApplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "etl/ETLService.hpp"
#include "etl/LoadBalancer.hpp"
#include "etl/NetworkValidatedLedgers.hpp"
#include "etl/SystemState.hpp"
#include "etl/WriterState.hpp"
#include "feed/SubscriptionManager.hpp"
#include "migration/MigrationInspectorFactory.hpp"
#include "rpc/Counters.hpp"
Expand Down Expand Up @@ -121,7 +123,11 @@ ClioApplication::run(bool const useNgWebServer)
// Interface to the database
auto backend = data::makeBackend(config_, cache);

cluster::ClusterCommunicationService clusterCommunicationService{backend};
auto systemState = etl::SystemState::makeSystemState(config_);

cluster::ClusterCommunicationService clusterCommunicationService{
backend, std::make_unique<etl::WriterState>(systemState)
};
clusterCommunicationService.run();

auto const amendmentCenter = std::make_shared<data::AmendmentCenter const>(backend);
Expand Down Expand Up @@ -151,7 +157,9 @@ ClioApplication::run(bool const useNgWebServer)
);

// ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes
auto etl = etl::ETLService::makeETLService(config_, ctx, backend, subscriptions, balancer, ledgers);
auto etl = etl::ETLService::makeETLService(
config_, std::move(systemState), ctx, backend, subscriptions, balancer, ledgers
);

auto workQueue = rpc::WorkQueue::makeWorkQueue(config_);
auto counters = rpc::Counters::makeCounters(workQueue);
Expand Down Expand Up @@ -197,7 +205,16 @@ ClioApplication::run(bool const useNgWebServer)
}

appStopper_.setOnStop(
Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *backend, cacheSaver, ioc)
Stopper::makeOnStopCallback(
httpServer.value(),
*balancer,
*etl,
*subscriptions,
*backend,
cacheSaver,
clusterCommunicationService,
ioc
)
);

// Blocks until stopped.
Expand All @@ -213,7 +230,9 @@ ClioApplication::run(bool const useNgWebServer)

auto const httpServer = web::makeHttpServer(config_, ioc, dosGuard, handler, cache);
appStopper_.setOnStop(
Stopper::makeOnStopCallback(*httpServer, *balancer, *etl, *subscriptions, *backend, cacheSaver, ioc)
Stopper::makeOnStopCallback(
*httpServer, *balancer, *etl, *subscriptions, *backend, cacheSaver, clusterCommunicationService, ioc
)
);

// Blocks until stopped.
Expand Down
10 changes: 9 additions & 1 deletion src/app/Stopper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include "cluster/Concepts.hpp"
#include "data/BackendInterface.hpp"
#include "data/LedgerCacheSaver.hpp"
#include "etl/ETLServiceInterface.hpp"
Expand Down Expand Up @@ -82,10 +83,14 @@ class Stopper {
* @param subscriptions The subscription manager to stop.
* @param backend The backend to stop.
* @param cacheSaver The ledger cache saver
* @param clusterCommunicationService The cluster communication service to stop.
* @param ioc The io_context to stop.
* @return The callback to be called on application stop.
*/
template <web::SomeServer ServerType, data::SomeLedgerCacheSaver LedgerCacheSaverType>
template <
web::SomeServer ServerType,
data::SomeLedgerCacheSaver LedgerCacheSaverType,
cluster::SomeClusterCommunicationService ClusterCommunicationServiceType>
static std::function<void(boost::asio::yield_context)>
makeOnStopCallback(
ServerType& server,
Expand All @@ -94,6 +99,7 @@ class Stopper {
feed::SubscriptionManagerInterface& subscriptions,
data::BackendInterface& backend,
LedgerCacheSaverType& cacheSaver,
ClusterCommunicationServiceType& clusterCommunicationService,
boost::asio::io_context& ioc
)
{
Expand All @@ -111,6 +117,8 @@ class Stopper {
});
coroutineGroup.asyncWait(yield);

clusterCommunicationService.stop();

etl.stop();
LOG(util::LogService::info()) << "ETL stopped";

Expand Down
141 changes: 141 additions & 0 deletions src/cluster/Backend.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.

Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#include "cluster/Backend.hpp"

#include "cluster/ClioNode.hpp"
#include "data/BackendInterface.hpp"
#include "etl/WriterState.hpp"

#include <boost/asio/bind_cancellation_slot.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/execution_context.hpp>
#include <boost/asio/executor.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/json/parse.hpp>
#include <boost/json/serialize.hpp>
#include <boost/json/value.hpp>
#include <boost/json/value_from.hpp>
#include <boost/json/value_to.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <fmt/format.h>

#include <chrono>
#include <memory>
#include <utility>
#include <vector>

namespace cluster {

Backend::Backend(
boost::asio::thread_pool& ctx,
std::shared_ptr<data::BackendInterface> backend,
std::unique_ptr<etl::WriterStateInterface const> writerState,
std::chrono::steady_clock::duration readInterval,
std::chrono::steady_clock::duration writeInterval
)
: backend_(std::move(backend))
, writerState_(std::move(writerState))
, readerTask_(readInterval, ctx)
, writerTask_(writeInterval, ctx)
, selfUuid_(std::make_shared<boost::uuids::uuid>(boost::uuids::random_generator{}()))
{
}

void
Backend::run()
{
readerTask_.run([this](boost::asio::yield_context yield) {
auto clusterData = doRead(yield);
onNewState_(selfUuid_, std::make_shared<ClusterData>(std::move(clusterData)));
});

writerTask_.run([this]() { doWrite(); });
}

Backend::~Backend()
{
stop();
}

void
Backend::stop()
{
readerTask_.stop();
writerTask_.stop();
}

ClioNode::CUuid
Backend::selfId() const
{
return selfUuid_;
}

Backend::ClusterData
Backend::doRead(boost::asio::yield_context yield)
{
BackendInterface::ClioNodesDataFetchResult expectedResult;
try {
expectedResult = backend_->fetchClioNodesData(yield);
} catch (...) {
expectedResult = std::unexpected{"Failed to fetch Clio nodes data"};
}

if (!expectedResult.has_value()) {
return std::unexpected{std::move(expectedResult).error()};
}

std::vector<ClioNode> otherNodesData;
for (auto const& [uuid, nodeDataStr] : expectedResult.value()) {
if (uuid == *selfUuid_) {
continue;
}

boost::system::error_code errorCode;
auto const json = boost::json::parse(nodeDataStr, errorCode);
if (errorCode.failed()) {
return std::unexpected{fmt::format("Error parsing json from DB: {}", nodeDataStr)};
}

auto expectedNodeData = boost::json::try_value_to<ClioNode>(json);
if (expectedNodeData.has_error()) {
return std::unexpected{fmt::format("Error converting json to ClioNode: {}", nodeDataStr)};
}
*expectedNodeData->uuid = uuid;
otherNodesData.push_back(std::move(expectedNodeData).value());
}
otherNodesData.push_back(ClioNode::from(selfUuid_, *writerState_));
return otherNodesData;
}

void
Backend::doWrite()
{
auto const selfData = ClioNode::from(selfUuid_, *writerState_);
boost::json::value jsonValue{};
boost::json::value_from(selfData, jsonValue);
backend_->writeNodeMessage(*selfData.uuid, boost::json::serialize(jsonValue.as_object()));
}

} // namespace cluster
147 changes: 147 additions & 0 deletions src/cluster/Backend.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.

Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#pragma once

#include "cluster/ClioNode.hpp"
#include "cluster/impl/RepeatedTask.hpp"
#include "data/BackendInterface.hpp"
#include "etl/WriterState.hpp"
#include "util/log/Logger.hpp"

#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/cancellation_signal.hpp>
#include <boost/asio/execution_context.hpp>
#include <boost/asio/executor.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/signals2/connection.hpp>
#include <boost/signals2/signal.hpp>
#include <boost/signals2/variadic_signal.hpp>
#include <boost/uuid/uuid.hpp>

#include <chrono>
#include <concepts>
#include <memory>
#include <string>
#include <vector>

namespace cluster {

/**
* @brief Backend communication handler for cluster state synchronization.
*
* This class manages reading and writing cluster state information to/from the backend database.
* It periodically reads the state of other nodes in the cluster and writes the current node's state,
* enabling cluster-wide coordination and awareness.
*/
class Backend {
public:
/** @brief Type representing cluster data result - either a vector of nodes or an error message */
using ClusterData = std::expected<std::vector<ClioNode>, std::string>;

private:
util::Logger log_{"ClusterCommunication"};

std::shared_ptr<data::BackendInterface> backend_;
std::unique_ptr<etl::WriterStateInterface const> writerState_;

impl::RepeatedTask<boost::asio::thread_pool> readerTask_;
impl::RepeatedTask<boost::asio::thread_pool> writerTask_;

ClioNode::Uuid selfUuid_;

boost::signals2::signal<void(ClioNode::CUuid, std::shared_ptr<ClusterData const>)> onNewState_;

public:
/**
* @brief Construct a Backend communication handler.
*
* @param ctx The execution context for asynchronous operations
* @param backend Interface to the backend database
* @param writerState State indicating whether this node is writing to the database
* @param readInterval How often to read cluster state from the backend
* @param writeInterval How often to write this node's state to the backend
*/
Backend(
boost::asio::thread_pool& ctx,
std::shared_ptr<data::BackendInterface> backend,
std::unique_ptr<etl::WriterStateInterface const> writerState,
std::chrono::steady_clock::duration readInterval,
std::chrono::steady_clock::duration writeInterval
);

~Backend();

Backend(Backend&&) = delete;
Backend&
operator=(Backend&&) = delete;
Backend(Backend const&) = delete;
Backend&
operator=(Backend const&) = delete;

/**
* @brief Start the backend read and write tasks.
*
* Begins periodic reading of cluster state from the backend and writing of this node's state.
*/
void
run();

/**
* @brief Stop the backend read and write tasks.
*
* Stops all periodic tasks and waits for them to complete.
*/
void
stop();

/**
* @brief Subscribe to new cluster state notifications.
*
* @tparam S Callable type accepting (ClioNode::cUUID, ClusterData)
* @param s Subscriber callback to be invoked when new cluster state is available
* @return A connection object that can be used to unsubscribe
*/
template <typename S>
requires std::invocable<S, ClioNode::CUuid, std::shared_ptr<ClusterData const>>
boost::signals2::connection
subscribeToNewState(S&& s)
{
return onNewState_.connect(s);
}

/**
* @brief Get the UUID of this node in the cluster.
*
* @return The UUID of this node.
*/
ClioNode::CUuid
selfId() const;

private:
ClusterData
doRead(boost::asio::yield_context yield);

void
doWrite();
};

} // namespace cluster
Loading