Skip to content

Commit eabe512

Browse files
committed
Move Leader lock to separate file
1 parent 4edf489 commit eabe512

File tree

4 files changed

+138
-116
lines changed

4 files changed

+138
-116
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
use {database::leader_pg_lock::LeaderLock, observe::metrics};
2+
3+
/// Tracks the autopilot leader lock status.
4+
/// Leader lock status is tracked based on calls to try_acquire()
5+
#[expect(clippy::large_enum_variant)]
6+
pub enum LeaderLockTracker {
7+
/// Leader lock mechanism is disabled.
8+
/// Only one autopilot instance should be running at all times.
9+
Disabled,
10+
/// Leader lock mechanism is enabled.
11+
/// This allows for multiple autopilots to run simultaneously with only one
12+
/// driving the auctions forward.
13+
/// The autopilot followers (not holding the lock) will keep their caches
14+
/// warm. It facilitates zero downtime deployments.
15+
Enabled {
16+
/// Are we currently the leader since last call to try_acquire()
17+
is_leader: bool,
18+
/// Were we previously the leader sinec the last call to try_acquire()
19+
was_leader: bool,
20+
leader_lock: LeaderLock,
21+
},
22+
}
23+
24+
impl LeaderLockTracker {
25+
pub fn new(leader_lock: Option<LeaderLock>) -> Self {
26+
match leader_lock {
27+
Some(leader_lock) => Self::Enabled {
28+
is_leader: false,
29+
was_leader: false,
30+
leader_lock,
31+
},
32+
None => Self::Disabled,
33+
}
34+
}
35+
36+
/// Tries to acquire the leader lock if it's enabled
37+
/// If not, does nothing
38+
/// Should be called at the beginning of every run loop iteration
39+
pub async fn try_acquire(&mut self) {
40+
let Self::Enabled {
41+
is_leader,
42+
was_leader,
43+
leader_lock,
44+
} = self
45+
else {
46+
return;
47+
};
48+
49+
*was_leader = *is_leader;
50+
51+
*is_leader = leader_lock.try_acquire().await.unwrap_or_else(|err| {
52+
tracing::error!(?err, "failed to become leader");
53+
Metrics::leader_lock_error();
54+
false
55+
});
56+
57+
if self.just_stepped_up() {
58+
tracing::info!("Stepped up as a leader");
59+
Metrics::leader_step_up();
60+
}
61+
}
62+
63+
/// Releases the leader lock if it was held
64+
/// Should be called after breaking out of run loop (for example: due to
65+
/// shutdown)
66+
pub async fn release(self) {
67+
if let Self::Enabled {
68+
mut leader_lock,
69+
is_leader: true,
70+
..
71+
} = self
72+
{
73+
tracing::info!("Shutdown received, stepping down as the leader");
74+
leader_lock.release().await;
75+
Metrics::leader_step_down();
76+
}
77+
}
78+
79+
/// Returns true if the last try_acquire call resulted in acquiring the
80+
/// leader lock If the feature is disabled, always returns false
81+
pub fn just_stepped_up(&self) -> bool {
82+
matches!(
83+
self,
84+
Self::Enabled {
85+
is_leader: true,
86+
was_leader: false,
87+
..
88+
}
89+
)
90+
}
91+
92+
/// Returns true if the leader lock is being held
93+
/// If the feature is disabled, always returns true
94+
pub fn is_leader(&self) -> bool {
95+
match self {
96+
Self::Enabled { is_leader, .. } => *is_leader,
97+
_ => true,
98+
}
99+
}
100+
}
101+
#[derive(prometheus_metric_storage::MetricStorage)]
102+
#[metric(subsystem = "leader_lock_tracker")]
103+
struct Metrics {
104+
/// Tracks the current leader status
105+
/// 1 - is currently autopilot leader
106+
/// 0 - is currently autopilot follower
107+
is_leader: prometheus::IntGauge,
108+
109+
/// Trackes the count of errors acquiring leader lock (should never happen)
110+
leader_lock_error: prometheus::IntCounter,
111+
}
112+
113+
impl Metrics {
114+
fn get() -> &'static Self {
115+
Metrics::instance(metrics::get_storage_registry()).unwrap()
116+
}
117+
118+
fn leader_step_up() {
119+
Self::get().is_leader.set(1)
120+
}
121+
122+
fn leader_step_down() {
123+
Self::get().is_leader.set(0)
124+
}
125+
126+
fn leader_lock_error() {
127+
Self::get().leader_lock_error.inc()
128+
}
129+
}

crates/autopilot/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod database;
44
pub mod domain;
55
pub mod event_updater;
66
pub mod infra;
7+
mod leader_lock_tracker;
78
mod maintenance;
89
pub mod periodic_db_cleanup;
910
pub mod run;

crates/autopilot/src/run_loop.rs

Lines changed: 2 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ use {
2020
self,
2121
solvers::dto::{settle, solve},
2222
},
23+
leader_lock_tracker::LeaderLockTracker,
2324
maintenance::Maintenance,
2425
run::Liveness,
2526
shutdown_controller::ShutdownController,
2627
solvable_orders::SolvableOrdersCache,
2728
},
2829
::observe::metrics,
2930
anyhow::{Context, Result},
30-
database::{leader_pg_lock::LeaderLock, order_events::OrderEventLabel},
31+
database::order_events::OrderEventLabel,
3132
ethrpc::block_stream::BlockInfo,
3233
futures::{FutureExt, TryFutureExt},
3334
itertools::Itertools,
@@ -90,95 +91,6 @@ pub struct RunLoop {
9091
winner_selection: winner_selection::Arbitrator,
9192
}
9293

93-
/// Tracks the autopilot leader lock status.
94-
#[expect(clippy::large_enum_variant)]
95-
enum LeaderLockTracker {
96-
Disabled,
97-
Enabled {
98-
is_leader: bool,
99-
was_leader: bool,
100-
leader_lock: LeaderLock,
101-
},
102-
}
103-
104-
impl LeaderLockTracker {
105-
pub fn new(leader_lock: Option<LeaderLock>) -> Self {
106-
match leader_lock {
107-
Some(leader_lock) => Self::Enabled {
108-
is_leader: false,
109-
was_leader: false,
110-
leader_lock,
111-
},
112-
None => Self::Disabled,
113-
}
114-
}
115-
116-
/// Tries to acquire the leader lock if it's enabled
117-
/// If not, does nothing
118-
/// Should be called at the beginning of every run loop iteration
119-
pub async fn try_acquire(&mut self) {
120-
let Self::Enabled {
121-
is_leader,
122-
was_leader,
123-
leader_lock,
124-
} = self
125-
else {
126-
return;
127-
};
128-
129-
*was_leader = *is_leader;
130-
131-
*is_leader = leader_lock.try_acquire().await.unwrap_or_else(|err| {
132-
tracing::error!(?err, "failed to become leader");
133-
Metrics::leader_lock_error();
134-
false
135-
});
136-
137-
if self.just_stepped_up() {
138-
tracing::info!("Stepped up as a leader");
139-
Metrics::leader_step_up();
140-
}
141-
}
142-
143-
/// Releases the leader lock if it was held
144-
/// Should be called after breaking out of run loop (for example: due to
145-
/// shutdown)
146-
pub async fn release(self) {
147-
if let Self::Enabled {
148-
mut leader_lock,
149-
is_leader: true,
150-
..
151-
} = self
152-
{
153-
tracing::info!("Shutdown received, stepping down as the leader");
154-
leader_lock.release().await;
155-
Metrics::leader_step_down();
156-
}
157-
}
158-
159-
/// Returns true if the last try_acquire call resulted in acquiring the
160-
/// leader lock If the feature is disabled, always returns false
161-
pub fn just_stepped_up(&self) -> bool {
162-
match self {
163-
Self::Enabled {
164-
is_leader: true,
165-
was_leader: false,
166-
..
167-
} => true,
168-
_ => false,
169-
}
170-
}
171-
172-
/// Returns true if the leader lock is being held
173-
/// If the feature is disabled, always returns true
174-
pub fn is_leader(&self) -> bool {
175-
match self {
176-
Self::Enabled { is_leader, .. } => *is_leader,
177-
_ => true,
178-
}
179-
}
180-
}
181-
18294
impl RunLoop {
18395
#[expect(clippy::too_many_arguments)]
18496
pub fn new(
@@ -1078,14 +990,6 @@ struct Metrics {
1078990
/// function is started.
1079991
#[metric(buckets(0, 0.25, 0.5, 0.75, 1, 1.5, 2, 2.5, 3, 4, 5, 6))]
1080992
current_block_delay: prometheus::Histogram,
1081-
1082-
/// Tracks the current leader status
1083-
/// 1 - is currently autopilot leader
1084-
/// 0 - is currently autopilot follower
1085-
is_leader: prometheus::IntGauge,
1086-
1087-
/// Trackes the count of errors acquiring leader lock (should never happen)
1088-
leader_lock_error: prometheus::IntCounter,
1089993
}
1090994

1091995
impl Metrics {
@@ -1186,18 +1090,6 @@ impl Metrics {
11861090
.current_block_delay
11871091
.observe(init_block_timestamp.elapsed().as_secs_f64())
11881092
}
1189-
1190-
fn leader_step_up() {
1191-
Self::get().is_leader.set(1)
1192-
}
1193-
1194-
fn leader_step_down() {
1195-
Self::get().is_leader.set(0)
1196-
}
1197-
1198-
fn leader_lock_error() {
1199-
Self::get().leader_lock_error.inc()
1200-
}
12011093
}
12021094

12031095
pub mod observe {

crates/observe/src/metrics.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,9 @@ pub fn serve_metrics(
9191
startup: Arc<Option<AtomicBool>>,
9292
) -> JoinHandle<()> {
9393
let filter = handle_metrics()
94-
.or(handle_liveness(liveness))
95-
.or(handle_readiness(readiness))
96-
.or(handle_startup(startup));
94+
.or(handle_liveness_probe(liveness))
95+
.or(handle_readiness_probe(readiness))
96+
.or(handle_startup_probe(startup));
9797
tracing::info!(%address, "serving metrics");
9898
task::spawn(warp::serve(filter).bind(address))
9999
}
@@ -104,7 +104,7 @@ pub fn handle_metrics() -> impl Filter<Extract = (impl Reply,), Error = Rejectio
104104
warp::path("metrics").map(move || encode(registry))
105105
}
106106

107-
fn handle_liveness(
107+
fn handle_liveness_probe(
108108
liveness_checker: Arc<dyn LivenessChecking>,
109109
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
110110
warp::path("liveness").and_then(move || {
@@ -120,7 +120,7 @@ fn handle_liveness(
120120
})
121121
}
122122

123-
fn handle_readiness(
123+
fn handle_readiness_probe(
124124
readiness: Arc<Option<AtomicBool>>,
125125
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
126126
warp::path("ready").and_then(move || {
@@ -143,7 +143,7 @@ fn handle_readiness(
143143
})
144144
}
145145

146-
fn handle_startup(
146+
fn handle_startup_probe(
147147
startup: Arc<Option<AtomicBool>>,
148148
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
149149
warp::path("startup").and_then(move || {

0 commit comments

Comments
 (0)