Skip to content
129 changes: 129 additions & 0 deletions crates/autopilot/src/leader_lock_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use {database::leader_pg_lock::LeaderLock, observe::metrics};

/// Tracks the autopilot leader lock status.
/// Leader lock status is tracked based on calls to try_acquire()
#[expect(clippy::large_enum_variant)]
pub enum LeaderLockTracker {
/// Leader lock mechanism is disabled.
/// Only one autopilot instance should be running at all times.
Disabled,
/// Leader lock mechanism is enabled.
/// This allows for multiple autopilots to run simultaneously with only one
/// driving the auctions forward.
/// The autopilot followers (not holding the lock) will keep their caches
/// warm. It facilitates zero downtime deployments.
Enabled {
/// Are we currently the leader since last call to try_acquire()
is_leader: bool,
/// Were we previously the leader sinec the last call to try_acquire()
was_leader: bool,
leader_lock: LeaderLock,
},
}

impl LeaderLockTracker {
pub fn new(leader_lock: Option<LeaderLock>) -> Self {
match leader_lock {
Some(leader_lock) => Self::Enabled {
is_leader: false,
was_leader: false,
leader_lock,
},
None => Self::Disabled,
}
}

/// Tries to acquire the leader lock if it's enabled
/// If not, does nothing
/// Should be called at the beginning of every run loop iteration
pub async fn try_acquire(&mut self) {
let Self::Enabled {
is_leader,
was_leader,
leader_lock,
} = self
else {
return;
};

*was_leader = *is_leader;

*is_leader = leader_lock.try_acquire().await.unwrap_or_else(|err| {
tracing::error!(?err, "failed to become leader");
Metrics::leader_lock_error();
false
});

if self.just_stepped_up() {
tracing::info!("Stepped up as a leader");
Metrics::leader_step_up();
}
}

/// Releases the leader lock if it was held
/// Should be called after breaking out of run loop (for example: due to
/// shutdown)
pub async fn release(self) {
if let Self::Enabled {
mut leader_lock,
is_leader: true,
..
} = self
{
tracing::info!("Shutdown received, stepping down as the leader");
leader_lock.release().await;
Metrics::leader_step_down();
}
}

/// Returns true if the last try_acquire call resulted in acquiring the
/// leader lock If the feature is disabled, always returns false
pub fn just_stepped_up(&self) -> bool {
matches!(
self,
Self::Enabled {
is_leader: true,
was_leader: false,
..
}
)
}

/// Returns true if the leader lock is being held
/// If the feature is disabled, always returns true
pub fn is_leader(&self) -> bool {
match self {
Self::Enabled { is_leader, .. } => *is_leader,
_ => true,
}
}
}
#[derive(prometheus_metric_storage::MetricStorage)]
#[metric(subsystem = "leader_lock_tracker")]
struct Metrics {
/// Tracks the current leader status
/// 1 - is currently autopilot leader
/// 0 - is currently autopilot follower
is_leader: prometheus::IntGauge,

/// Trackes the count of errors acquiring leader lock (should never happen)
leader_lock_error: prometheus::IntCounter,
}

impl Metrics {
fn get() -> &'static Self {
Metrics::instance(metrics::get_storage_registry()).unwrap()
}

fn leader_step_up() {
Self::get().is_leader.set(1)
}

fn leader_step_down() {
Self::get().is_leader.set(0)
}

fn leader_lock_error() {
Self::get().leader_lock_error.inc()
}
}
1 change: 1 addition & 0 deletions crates/autopilot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod database;
pub mod domain;
pub mod event_updater;
pub mod infra;
mod leader_lock_tracker;
mod maintenance;
pub mod periodic_db_cleanup;
pub mod run;
Expand Down
22 changes: 17 additions & 5 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,13 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) {
);

let liveness = Arc::new(Liveness::new(args.max_auction_age));
let readiness = Arc::new(Some(AtomicBool::new(false)));
observe::metrics::serve_metrics(liveness.clone(), args.metrics_address, readiness.clone());
let startup = Arc::new(Some(AtomicBool::new(false)));
observe::metrics::serve_metrics(
liveness.clone(),
args.metrics_address,
Default::default(),
startup.clone(),
);

let order_events_cleaner_config = crate::periodic_db_cleanup::OrderEventsCleanerConfig::new(
args.order_events_cleanup_interval,
Expand Down Expand Up @@ -691,8 +696,10 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) {
solver_participation_guard,
solvable_orders_cache,
trusted_tokens,
liveness.clone(),
readiness.clone(),
run_loop::Probes {
liveness: liveness.clone(),
startup,
},
Arc::new(maintenance),
competition_updates_sender,
);
Expand Down Expand Up @@ -772,7 +779,12 @@ async fn shadow_mode(args: Arguments) -> ! {
};

let liveness = Arc::new(Liveness::new(args.max_auction_age));
observe::metrics::serve_metrics(liveness.clone(), args.metrics_address, Default::default());
observe::metrics::serve_metrics(
liveness.clone(),
args.metrics_address,
Default::default(),
Default::default(),
);

let current_block = ethrpc::block_stream::current_block_stream(
args.shared.node_url,
Expand Down
79 changes: 23 additions & 56 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use {
self,
solvers::dto::{settle, solve},
},
leader_lock_tracker::LeaderLockTracker,
maintenance::Maintenance,
run::Liveness,
shutdown_controller::ShutdownController,
Expand Down Expand Up @@ -68,6 +69,11 @@ pub struct Config {
pub enable_leader_lock: bool,
}

pub struct Probes {
pub liveness: Arc<Liveness>,
pub startup: Arc<Option<AtomicBool>>,
}

pub struct RunLoop {
config: Config,
eth: infra::Ethereum,
Expand All @@ -77,8 +83,7 @@ pub struct RunLoop {
solvable_orders_cache: Arc<SolvableOrdersCache>,
trusted_tokens: AutoUpdatingTokenList,
in_flight_orders: Arc<Mutex<HashSet<OrderUid>>>,
liveness: Arc<Liveness>,
readiness: Arc<Option<AtomicBool>>,
probes: Probes,
/// Maintenance tasks that should run before every runloop to have
/// the most recent data available.
maintenance: Arc<Maintenance>,
Expand All @@ -96,8 +101,7 @@ impl RunLoop {
solver_participation_guard: SolverParticipationGuard,
solvable_orders_cache: Arc<SolvableOrdersCache>,
trusted_tokens: AutoUpdatingTokenList,
liveness: Arc<Liveness>,
readiness: Arc<Option<AtomicBool>>,
probes: Probes,
maintenance: Arc<Maintenance>,
competition_updates_sender: tokio::sync::mpsc::UnboundedSender<()>,
) -> Self {
Expand All @@ -113,8 +117,7 @@ impl RunLoop {
solvable_orders_cache,
trusted_tokens,
in_flight_orders: Default::default(),
liveness,
readiness,
probes,
maintenance,
competition_updates_sender,
winner_selection: winner_selection::Arbitrator { max_winners, weth },
Expand All @@ -131,7 +134,7 @@ impl RunLoop {
let mut last_block = None;

let self_arc = Arc::new(self);
let mut leader = if self_arc.config.enable_leader_lock {
let leader = if self_arc.config.enable_leader_lock {
Some(
self_arc
.persistence
Expand All @@ -141,37 +144,26 @@ impl RunLoop {
} else {
None
};
let mut was_leader = false;
let mut leader_lock_tracker = LeaderLockTracker::new(leader);

while !control.should_shutdown() {
let is_leader = if let Some(ref mut leader) = leader {
leader.try_acquire().await.unwrap_or_else(|err| {
tracing::error!(?err, "failed to become leader");
Metrics::leader_lock_error();
false
})
} else {
true
};
leader_lock_tracker.try_acquire().await;

if leader.is_some() && is_leader && !was_leader {
tracing::info!("Stepped up as a leader");
Metrics::leader_step_up();
let start_block = self_arc
.update_caches(&mut last_block, leader_lock_tracker.is_leader())
.await;

// caches are warmed up, we're ready to do leader work
if let Some(startup) = self_arc.probes.startup.as_ref() {
startup.store(true, Ordering::Release);
}
was_leader = is_leader;

let start_block = self_arc.update_caches(&mut last_block, is_leader).await;
if !is_leader {
if !leader_lock_tracker.is_leader() {
// only the leader is supposed to run the auctions
tokio::time::sleep(Duration::from_millis(200)).await;
continue;
}

// caches are warmed up, we're ready to do leader work
if let Some(readiness) = self_arc.readiness.as_ref() {
readiness.store(true, Ordering::Release);
}

if let Some(auction) = self_arc
.next_auction(start_block, &mut last_auction, &mut last_block)
.await
Expand All @@ -183,12 +175,7 @@ impl RunLoop {
.await
}
}

if let Some(mut leader) = leader {
tracing::info!("Shutdown received, stepping down as the leader");
leader.release().await;
Metrics::leader_step_down();
}
leader_lock_tracker.release().await;
}

async fn update_caches(&self, prev_block: &mut Option<H256>, is_leader: bool) -> BlockInfo {
Expand Down Expand Up @@ -248,7 +235,7 @@ impl RunLoop {
}

observe::log_auction_delta(&previous, &auction);
self.liveness.auction();
self.probes.liveness.auction();
Metrics::auction_ready(start_block.observed_at);
Some(auction)
}
Expand Down Expand Up @@ -284,7 +271,7 @@ impl RunLoop {

if auction.orders.is_empty() {
// Updating liveness probe to not report unhealthy due to this optimization
self.liveness.auction();
self.probes.liveness.auction();
tracing::debug!("skipping empty auction");
return None;
}
Expand Down Expand Up @@ -1003,14 +990,6 @@ struct Metrics {
/// function is started.
#[metric(buckets(0, 0.25, 0.5, 0.75, 1, 1.5, 2, 2.5, 3, 4, 5, 6))]
current_block_delay: prometheus::Histogram,

/// Tracks the current leader status
/// 1 - is currently autopilot leader
/// 0 - is currently autopilot follower
is_leader: prometheus::IntGauge,

/// Trackes the count of errors acquiring leader lock (should never happen)
leader_lock_error: prometheus::IntCounter,
}

impl Metrics {
Expand Down Expand Up @@ -1111,18 +1090,6 @@ impl Metrics {
.current_block_delay
.observe(init_block_timestamp.elapsed().as_secs_f64())
}

fn leader_step_up() {
Self::get().is_leader.set(1)
}

fn leader_step_down() {
Self::get().is_leader.set(0)
}

fn leader_lock_error() {
Self::get().leader_lock_error.inc()
}
}

pub mod observe {
Expand Down
Loading
Loading