Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ad8c588
Fix some bugs
kuznetsss Nov 19, 2025
19b4550
Merge branch 'develop' into 1974_Choose_writer_from_communication
kuznetsss Nov 20, 2025
db86ff9
Change flags in etl SystemState
kuznetsss Nov 20, 2025
1b00f28
Add db role to ClioNode
kuznetsss Nov 21, 2025
a32177a
Add WriterState
kuznetsss Nov 24, 2025
49c3eb4
Refactored ClusterCommunicationService
kuznetsss Nov 25, 2025
a68a874
Fix docs
kuznetsss Nov 25, 2025
ff7cc6a
Add metrics subservice
kuznetsss Nov 25, 2025
0e514d7
Move implementation to cpp and add docs
kuznetsss Nov 26, 2025
e84cf0a
Add WriterDecider
kuznetsss Nov 26, 2025
6181abc
Create cpp file and add docs
kuznetsss Nov 26, 2025
f855d8d
Add tests for ClioNode
kuznetsss Nov 26, 2025
e8409a3
WIP
kuznetsss Nov 26, 2025
8fa1cb9
Add tests for RepeatedTask
kuznetsss Nov 27, 2025
60e3ce0
Add Backend tests
kuznetsss Dec 2, 2025
55ba0ab
Add tests for Metrics
kuznetsss Dec 2, 2025
4697b1d
Add tests for WriterDecider
kuznetsss Dec 3, 2025
5758575
Add tests for ClusterCommunicationService
kuznetsss Dec 3, 2025
4818704
Merge branch 'develop' into 1974_Choose_writer_from_communication
kuznetsss Dec 3, 2025
da5008b
Fix typo in variable name
kuznetsss Dec 3, 2025
e16efbf
Add clusterCommunicationService to graceful shutdown
kuznetsss Dec 4, 2025
fde125f
Merge branch 'develop' into 1974_Choose_writer_from_communication
kuznetsss Dec 10, 2025
847ed98
Add writing command to etl::SystemState
kuznetsss Jan 8, 2026
fd01dbb
Improve channels
kuznetsss Jan 8, 2026
c9cdcd0
Add async framework support in Channel
kuznetsss Jan 8, 2026
d71c466
Apply bug fixes
kuznetsss Jan 8, 2026
4573575
Add tests
kuznetsss Jan 9, 2026
4599a70
Run pre-commit
kuznetsss Jan 9, 2026
baddeff
Merge and fix build
kuznetsss Jan 13, 2026
a64c8c4
Add fallback writer role
kuznetsss Jan 13, 2026
584417a
Fix tests and add new
kuznetsss Jan 13, 2026
0737efc
Run pre-commit
kuznetsss Jan 13, 2026
22184b1
Merge branch 'develop' into Add_etl_system_state_command
kuznetsss Jan 15, 2026
bae896f
Run pre-commit
kuznetsss Jan 16, 2026
d4236b1
Fix flaky test
kuznetsss Jan 16, 2026
39d4b94
Run pre-commit
kuznetsss Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions src/etl/ETLService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ ETLService::ETLService(
, state_(std::move(state))
, startSequence_(config.get().maybeValue<uint32_t>("start_sequence"))
, finishSequence_(config.get().maybeValue<uint32_t>("finish_sequence"))
, writeCommandStrand_(ctx_.makeStrand())
{
ASSERT(not state_->isWriting, "ETL should never start in writer mode");

Expand Down Expand Up @@ -232,6 +233,13 @@ ETLService::stop()
{
LOG(log_.info()) << "Stop called";

systemStateWriteCommandSubscription_.disconnect();
auto count = runningWriteCommandHandlers_.load();
while (count != 0) {
runningWriteCommandHandlers_.wait(count); // Blocks until value changes
count = runningWriteCommandHandlers_.load();
}

if (mainLoop_)
mainLoop_->wait();
if (taskMan_)
Expand Down Expand Up @@ -348,21 +356,40 @@ ETLService::startMonitor(uint32_t seq)
{
monitor_ = monitorProvider_->make(ctx_, backend_, ledgers_, seq);

systemStateWriteCommandSubscription_ =
state_->writeCommandSignal.connect([this](SystemState::WriteCommand command) {
++runningWriteCommandHandlers_;
writeCommandStrand_.submit([this, command]() {
switch (command) {
case etl::SystemState::WriteCommand::StartWriting:
attemptTakeoverWriter();
break;
case etl::SystemState::WriteCommand::StopWriting:
giveUpWriter();
break;
}
--runningWriteCommandHandlers_;
runningWriteCommandHandlers_.notify_one();
});
});

monitorNewSeqSubscription_ = monitor_->subscribeToNewSequence([this](uint32_t seq) {
LOG(log_.info()) << "ETLService (via Monitor) got new seq from db: " << seq;

if (state_->writeConflict) {
LOG(log_.info()) << "Got a write conflict; Giving up writer seat immediately";
giveUpWriter();
}
auto const cacheNeedsUpdate = backend_->cache().latestLedgerSequence() < seq;
auto const backendRange = backend_->fetchLedgerRange();
auto const backendNeedsUpdate = backendRange.has_value() and backendRange->maxSequence < seq;

if (not state_->isWriting) {
if (cacheNeedsUpdate or backendNeedsUpdate) {
auto const diff = data::synchronousAndRetryOnTimeout([this, seq](auto yield) {
return backend_->fetchLedgerDiff(seq, yield);
});

cacheUpdater_->update(seq, diff);
backend_->updateRange(seq);
if (cacheNeedsUpdate)
cacheUpdater_->update(seq, diff);

if (backendNeedsUpdate)
backend_->updateRange(seq);
}

publisher_->publish(seq, {});
Expand All @@ -371,7 +398,7 @@ ETLService::startMonitor(uint32_t seq)
monitorDbStalledSubscription_ = monitor_->subscribeToDbStalled([this]() {
LOG(log_.warn()) << "ETLService received DbStalled signal from Monitor";
if (not state_->isStrictReadonly and not state_->isWriting)
attemptTakeoverWriter();
state_->writeCommandSignal(SystemState::WriteCommand::StartWriting);
});

monitor_->run();
Expand Down Expand Up @@ -404,7 +431,6 @@ ETLService::giveUpWriter()
{
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
state_->isWriting = false;
state_->writeConflict = false;
taskMan_ = nullptr;
}

Expand Down
6 changes: 5 additions & 1 deletion src/etl/ETLService.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/async/AnyStrand.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/log/Logger.hpp"

Expand All @@ -69,12 +70,12 @@
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/TxMeta.h>

#include <atomic>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <string>

namespace etl {

Expand Down Expand Up @@ -117,6 +118,9 @@ class ETLService : public ETLServiceInterface {

boost::signals2::scoped_connection monitorNewSeqSubscription_;
boost::signals2::scoped_connection monitorDbStalledSubscription_;
boost::signals2::scoped_connection systemStateWriteCommandSubscription_;
util::async::AnyStrand writeCommandStrand_;
std::atomic<size_t> runningWriteCommandHandlers_{0};

std::optional<util::async::AnyOperation<void>> mainLoop_;

Expand Down
24 changes: 21 additions & 3 deletions src/etl/SystemState.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

#pragma once

#include "util/Channel.hpp"
#include "util/prometheus/Bool.hpp"
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"

#include <atomic>
#include <boost/signals2/signal.hpp>
#include <boost/signals2/variadic_signal.hpp>

namespace etl {

Expand All @@ -50,8 +52,24 @@ struct SystemState {
"Whether the process is writing to the database"
);

std::atomic_bool isStopping = false; /**< @brief Whether the software is stopping. */
std::atomic_bool writeConflict = false; /**< @brief Whether a write conflict was detected. */
/**
* @brief Commands for controlling the ETL writer state.
*
* These commands are emitted via writeCommandSignal to coordinate writer state transitions across components.
*/
enum class WriteCommand {
StartWriting, /**< Request to attempt taking over as the ETL writer */
StopWriting /**< Request to give up the ETL writer role (e.g., due to write conflict) */
};

/**
* @brief Signal for coordinating ETL writer state transitions.
*
* This signal allows components to request changes to the writer state without direct coupling.
* - Emitted with StartWriting when database stalls and node should attempt to become writer
* - Emitted with StopWriting when write conflicts are detected
*/
boost::signals2::signal<void(WriteCommand)> writeCommandSignal;

/**
* @brief Whether clio detected an amendment block.
Expand Down
17 changes: 16 additions & 1 deletion src/etl/impl/LedgerPublisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <xrpl/protocol/Serializer.h>

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
Expand Down Expand Up @@ -76,6 +77,8 @@ class LedgerPublisher : public LedgerPublisherInterface {

util::async::AnyStrand publishStrand_;

std::atomic_bool stop_{false};

std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
std::reference_wrapper<SystemState const> state_; // shared state for ETL
Expand Down Expand Up @@ -125,7 +128,7 @@ class LedgerPublisher : public LedgerPublisherInterface {
{
LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
size_t numAttempts = 0;
while (not state_.get().isStopping) {
while (not stop_) {
auto range = backend_->hardFetchLedgerRangeNoThrow();

if (!range || range->maxSequence < ledgerSequence) {
Expand Down Expand Up @@ -258,6 +261,18 @@ class LedgerPublisher : public LedgerPublisherInterface {
return *lastPublishedSequence_.lock();
}

/**
* @brief Stops publishing
*
* @note This is a basic implementation to satisfy tests. This will be improved in
* https://github.com/XRPLF/clio/issues/2833
*/
void
stop()
{
stop_ = true;
}

private:
void
setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
Expand Down
2 changes: 1 addition & 1 deletion src/etl/impl/Loading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Loader::load(model::LedgerData const& data)
<< "; took " << duration << "ms";

if (not success) {
state_->writeConflict = true;
state_->writeCommandSignal(SystemState::WriteCommand::StopWriting);
LOG(log_.warn()) << "Another node wrote a ledger into the DB - we have a write conflict";
return std::unexpected(LoaderError::WriteConflict);
}
Expand Down
Loading
Loading