Skip to content

Commit 5090dc1

Browse files
m-szjmg-duartesquadgazzz
authored
Replace readiness probe with startup for autopilot (#3856)
# Description The zero downtime deployment for autopilot requires the newly created one to warm up its caches before the previous one can step down and leadership be transferred. Previous implementation reported readiness after taking over and the readiness probe was not used, making this mechanism fail to work. # Changes Remove the usage of readiness probe in autopilot. Introduce startup probe. Signal startup finished once caches become warmed up, regardless of leader lock status. ## How to test 1. Deploy the PR along with infrastructure's cowprotocol/infrastructure#3855 to staging 2. Observe previous autopilot to shut down down only after the new one finishes warming up its caches. 3. The transition between old autopilot shutdown and new acquiring leader lock should be quick (a couple of milliseconds). <!-- ## Related Issues Fixes # --> --------- Co-authored-by: José Duarte <[email protected]> Co-authored-by: ilya <[email protected]>
1 parent 5133bd5 commit 5090dc1

File tree

7 files changed

+208
-66
lines changed

7 files changed

+208
-66
lines changed
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use {database::leader_pg_lock::LeaderLock, observe::metrics};
2+
3+
/// Tracks the autopilot leader lock status.
4+
/// Leader lock status is tracked based on calls to try_acquire()
5+
#[expect(clippy::large_enum_variant)]
6+
pub enum LeaderLockTracker {
7+
/// Leader lock mechanism is disabled.
8+
/// Only one autopilot instance should be running at all times.
9+
Disabled,
10+
/// Leader lock mechanism is enabled.
11+
/// This allows for multiple autopilots to run simultaneously with only one
12+
/// driving the auctions forward.
13+
/// The autopilot followers (not holding the lock) will keep their caches
14+
/// warm. It facilitates zero downtime deployments.
15+
Enabled {
16+
/// Whether the current instance is the leader since the last call to
17+
/// try_acquire()
18+
is_leader: bool,
19+
/// Whether the instance was the leader since the last call to
20+
/// try_acquire()
21+
was_leader: bool,
22+
leader_lock: LeaderLock,
23+
},
24+
}
25+
26+
impl LeaderLockTracker {
27+
pub fn new(leader_lock: Option<LeaderLock>) -> Self {
28+
match leader_lock {
29+
Some(leader_lock) => Self::Enabled {
30+
is_leader: false,
31+
was_leader: false,
32+
leader_lock,
33+
},
34+
None => Self::Disabled,
35+
}
36+
}
37+
38+
/// Tries to acquire the leader lock if it's enabled
39+
/// If not, does nothing
40+
/// Should be called at the beginning of every run loop iteration
41+
pub async fn try_acquire(&mut self) {
42+
let Self::Enabled {
43+
is_leader,
44+
was_leader,
45+
leader_lock,
46+
} = self
47+
else {
48+
return;
49+
};
50+
51+
*was_leader = *is_leader;
52+
53+
*is_leader = leader_lock.try_acquire().await.unwrap_or_else(|err| {
54+
tracing::error!(?err, "failed to become leader");
55+
Metrics::leader_lock_error();
56+
false
57+
});
58+
59+
if self.just_stepped_up() {
60+
tracing::info!("Stepped up as a leader");
61+
Metrics::leader_step_up();
62+
}
63+
}
64+
65+
/// Releases the leader lock if it was held
66+
/// Should be called after breaking out of run loop (for example: due to
67+
/// shutdown)
68+
pub async fn release(self) {
69+
if let Self::Enabled {
70+
mut leader_lock,
71+
is_leader: true,
72+
..
73+
} = self
74+
{
75+
tracing::info!("Shutdown received, stepping down as the leader");
76+
leader_lock.release().await;
77+
Metrics::leader_step_down();
78+
}
79+
}
80+
81+
/// Returns true if the last try_acquire call resulted in acquiring the
82+
/// leader lock If the feature is disabled, always returns false
83+
pub fn just_stepped_up(&self) -> bool {
84+
matches!(
85+
self,
86+
Self::Enabled {
87+
is_leader: true,
88+
was_leader: false,
89+
..
90+
}
91+
)
92+
}
93+
94+
/// Returns true if the leader lock is being held
95+
/// If the feature is disabled, always returns true
96+
pub fn is_leader(&self) -> bool {
97+
match self {
98+
Self::Enabled { is_leader, .. } => *is_leader,
99+
_ => true,
100+
}
101+
}
102+
}
103+
#[derive(prometheus_metric_storage::MetricStorage)]
104+
#[metric(subsystem = "leader_lock_tracker")]
105+
struct Metrics {
106+
/// Tracks the current leader status
107+
/// 1 - is currently autopilot leader
108+
/// 0 - is currently autopilot follower
109+
is_leader: prometheus::IntGauge,
110+
111+
/// Trackes the count of errors acquiring leader lock (should never happen)
112+
leader_lock_error: prometheus::IntCounter,
113+
}
114+
115+
impl Metrics {
116+
fn get() -> &'static Self {
117+
Metrics::instance(metrics::get_storage_registry()).unwrap()
118+
}
119+
120+
fn leader_step_up() {
121+
Self::get().is_leader.set(1)
122+
}
123+
124+
fn leader_step_down() {
125+
Self::get().is_leader.set(0)
126+
}
127+
128+
fn leader_lock_error() {
129+
Self::get().leader_lock_error.inc()
130+
}
131+
}

crates/autopilot/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod database;
44
pub mod domain;
55
pub mod event_updater;
66
pub mod infra;
7+
mod leader_lock_tracker;
78
mod maintenance;
89
pub mod periodic_db_cleanup;
910
pub mod run;

crates/autopilot/src/run.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -538,8 +538,13 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) {
538538
);
539539

540540
let liveness = Arc::new(Liveness::new(args.max_auction_age));
541-
let readiness = Arc::new(Some(AtomicBool::new(false)));
542-
observe::metrics::serve_metrics(liveness.clone(), args.metrics_address, readiness.clone());
541+
let startup = Arc::new(Some(AtomicBool::new(false)));
542+
observe::metrics::serve_metrics(
543+
liveness.clone(),
544+
args.metrics_address,
545+
Default::default(),
546+
startup.clone(),
547+
);
543548

544549
let order_events_cleaner_config = crate::periodic_db_cleanup::OrderEventsCleanerConfig::new(
545550
args.order_events_cleanup_interval,
@@ -691,8 +696,10 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) {
691696
solver_participation_guard,
692697
solvable_orders_cache,
693698
trusted_tokens,
694-
liveness.clone(),
695-
readiness.clone(),
699+
run_loop::Probes {
700+
liveness: liveness.clone(),
701+
startup,
702+
},
696703
Arc::new(maintenance),
697704
competition_updates_sender,
698705
);
@@ -772,7 +779,12 @@ async fn shadow_mode(args: Arguments) -> ! {
772779
};
773780

774781
let liveness = Arc::new(Liveness::new(args.max_auction_age));
775-
observe::metrics::serve_metrics(liveness.clone(), args.metrics_address, Default::default());
782+
observe::metrics::serve_metrics(
783+
liveness.clone(),
784+
args.metrics_address,
785+
Default::default(),
786+
Default::default(),
787+
);
776788

777789
let current_block = ethrpc::block_stream::current_block_stream(
778790
args.shared.node_url,

crates/autopilot/src/run_loop.rs

Lines changed: 23 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use {
2020
self,
2121
solvers::dto::{settle, solve},
2222
},
23+
leader_lock_tracker::LeaderLockTracker,
2324
maintenance::Maintenance,
2425
run::Liveness,
2526
shutdown_controller::ShutdownController,
@@ -68,6 +69,11 @@ pub struct Config {
6869
pub enable_leader_lock: bool,
6970
}
7071

72+
pub struct Probes {
73+
pub liveness: Arc<Liveness>,
74+
pub startup: Arc<Option<AtomicBool>>,
75+
}
76+
7177
pub struct RunLoop {
7278
config: Config,
7379
eth: infra::Ethereum,
@@ -77,8 +83,7 @@ pub struct RunLoop {
7783
solvable_orders_cache: Arc<SolvableOrdersCache>,
7884
trusted_tokens: AutoUpdatingTokenList,
7985
in_flight_orders: Arc<Mutex<HashSet<OrderUid>>>,
80-
liveness: Arc<Liveness>,
81-
readiness: Arc<Option<AtomicBool>>,
86+
probes: Probes,
8287
/// Maintenance tasks that should run before every runloop to have
8388
/// the most recent data available.
8489
maintenance: Arc<Maintenance>,
@@ -96,8 +101,7 @@ impl RunLoop {
96101
solver_participation_guard: SolverParticipationGuard,
97102
solvable_orders_cache: Arc<SolvableOrdersCache>,
98103
trusted_tokens: AutoUpdatingTokenList,
99-
liveness: Arc<Liveness>,
100-
readiness: Arc<Option<AtomicBool>>,
104+
probes: Probes,
101105
maintenance: Arc<Maintenance>,
102106
competition_updates_sender: tokio::sync::mpsc::UnboundedSender<()>,
103107
) -> Self {
@@ -113,8 +117,7 @@ impl RunLoop {
113117
solvable_orders_cache,
114118
trusted_tokens,
115119
in_flight_orders: Default::default(),
116-
liveness,
117-
readiness,
120+
probes,
118121
maintenance,
119122
competition_updates_sender,
120123
winner_selection: winner_selection::Arbitrator { max_winners, weth },
@@ -131,7 +134,7 @@ impl RunLoop {
131134
let mut last_block = None;
132135

133136
let self_arc = Arc::new(self);
134-
let mut leader = if self_arc.config.enable_leader_lock {
137+
let leader = if self_arc.config.enable_leader_lock {
135138
Some(
136139
self_arc
137140
.persistence
@@ -141,37 +144,26 @@ impl RunLoop {
141144
} else {
142145
None
143146
};
144-
let mut was_leader = false;
147+
let mut leader_lock_tracker = LeaderLockTracker::new(leader);
145148

146149
while !control.should_shutdown() {
147-
let is_leader = if let Some(ref mut leader) = leader {
148-
leader.try_acquire().await.unwrap_or_else(|err| {
149-
tracing::error!(?err, "failed to become leader");
150-
Metrics::leader_lock_error();
151-
false
152-
})
153-
} else {
154-
true
155-
};
150+
leader_lock_tracker.try_acquire().await;
156151

157-
if leader.is_some() && is_leader && !was_leader {
158-
tracing::info!("Stepped up as a leader");
159-
Metrics::leader_step_up();
152+
let start_block = self_arc
153+
.update_caches(&mut last_block, leader_lock_tracker.is_leader())
154+
.await;
155+
156+
// caches are warmed up, we're ready to do leader work
157+
if let Some(startup) = self_arc.probes.startup.as_ref() {
158+
startup.store(true, Ordering::Release);
160159
}
161-
was_leader = is_leader;
162160

163-
let start_block = self_arc.update_caches(&mut last_block, is_leader).await;
164-
if !is_leader {
161+
if !leader_lock_tracker.is_leader() {
165162
// only the leader is supposed to run the auctions
166163
tokio::time::sleep(Duration::from_millis(200)).await;
167164
continue;
168165
}
169166

170-
// caches are warmed up, we're ready to do leader work
171-
if let Some(readiness) = self_arc.readiness.as_ref() {
172-
readiness.store(true, Ordering::Release);
173-
}
174-
175167
if let Some(auction) = self_arc
176168
.next_auction(start_block, &mut last_auction, &mut last_block)
177169
.await
@@ -183,12 +175,7 @@ impl RunLoop {
183175
.await
184176
}
185177
}
186-
187-
if let Some(mut leader) = leader {
188-
tracing::info!("Shutdown received, stepping down as the leader");
189-
leader.release().await;
190-
Metrics::leader_step_down();
191-
}
178+
leader_lock_tracker.release().await;
192179
}
193180

194181
async fn update_caches(&self, prev_block: &mut Option<H256>, is_leader: bool) -> BlockInfo {
@@ -248,7 +235,7 @@ impl RunLoop {
248235
}
249236

250237
observe::log_auction_delta(&previous, &auction);
251-
self.liveness.auction();
238+
self.probes.liveness.auction();
252239
Metrics::auction_ready(start_block.observed_at);
253240
Some(auction)
254241
}
@@ -284,7 +271,7 @@ impl RunLoop {
284271

285272
if auction.orders.is_empty() {
286273
// Updating liveness probe to not report unhealthy due to this optimization
287-
self.liveness.auction();
274+
self.probes.liveness.auction();
288275
tracing::debug!("skipping empty auction");
289276
return None;
290277
}
@@ -1003,14 +990,6 @@ struct Metrics {
1003990
/// function is started.
1004991
#[metric(buckets(0, 0.25, 0.5, 0.75, 1, 1.5, 2, 2.5, 3, 4, 5, 6))]
1005992
current_block_delay: prometheus::Histogram,
1006-
1007-
/// Tracks the current leader status
1008-
/// 1 - is currently autopilot leader
1009-
/// 0 - is currently autopilot follower
1010-
is_leader: prometheus::IntGauge,
1011-
1012-
/// Trackes the count of errors acquiring leader lock (should never happen)
1013-
leader_lock_error: prometheus::IntCounter,
1014993
}
1015994

1016995
impl Metrics {
@@ -1111,18 +1090,6 @@ impl Metrics {
11111090
.current_block_delay
11121091
.observe(init_block_timestamp.elapsed().as_secs_f64())
11131092
}
1114-
1115-
fn leader_step_up() {
1116-
Self::get().is_leader.set(1)
1117-
}
1118-
1119-
fn leader_step_down() {
1120-
Self::get().is_leader.set(0)
1121-
}
1122-
1123-
fn leader_lock_error() {
1124-
Self::get().leader_lock_error.inc()
1125-
}
11261093
}
11271094

11281095
pub mod observe {

0 commit comments

Comments
 (0)