Skip to content

Commit 93786e0

Browse files
committed
Added startup leader take over log
1 parent d37ceb6 commit 93786e0

File tree

3 files changed

+38
-14
lines changed

3 files changed

+38
-14
lines changed

crates/autopilot/src/run.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,12 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) {
539539

540540
let liveness = Arc::new(Liveness::new(args.max_auction_age));
541541
let startup = Arc::new(Some(AtomicBool::new(false)));
542-
observe::metrics::serve_metrics(liveness.clone(), args.metrics_address, Default::default(), startup.clone());
542+
observe::metrics::serve_metrics(
543+
liveness.clone(),
544+
args.metrics_address,
545+
Default::default(),
546+
startup.clone(),
547+
);
543548

544549
let order_events_cleaner_config = crate::periodic_db_cleanup::OrderEventsCleanerConfig::new(
545550
args.order_events_cleanup_interval,
@@ -774,7 +779,12 @@ async fn shadow_mode(args: Arguments) -> ! {
774779
};
775780

776781
let liveness = Arc::new(Liveness::new(args.max_auction_age));
777-
observe::metrics::serve_metrics(liveness.clone(), args.metrics_address, Default::default(), Default::default());
782+
observe::metrics::serve_metrics(
783+
liveness.clone(),
784+
args.metrics_address,
785+
Default::default(),
786+
Default::default(),
787+
);
778788

779789
let current_block = ethrpc::block_stream::current_block_stream(
780790
args.shared.node_url,

crates/autopilot/src/run_loop.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -143,33 +143,42 @@ impl RunLoop {
143143
} else {
144144
None
145145
};
146-
let mut was_leader = false;
146+
let mut was_leader: Option<bool> = leader.as_ref().map(|_| false);
147147

148148
while !control.should_shutdown() {
149-
let is_leader = if let Some(ref mut leader) = leader {
150-
leader.try_acquire().await.unwrap_or_else(|err| {
149+
let is_leader = match leader {
150+
Some(ref mut leader) => Some(leader.try_acquire().await.unwrap_or_else(|err| {
151151
tracing::error!(?err, "failed to become leader");
152152
Metrics::leader_lock_error();
153153
false
154-
})
155-
} else {
156-
true
154+
})),
155+
None => None,
157156
};
157+
let stepped_up = is_leader
158+
.zip(was_leader)
159+
.is_some_and(|(is_leader, was_leader)| is_leader && !was_leader);
158160

159-
if leader.is_some() && is_leader && !was_leader {
161+
if stepped_up {
160162
tracing::info!("Stepped up as a leader");
161163
Metrics::leader_step_up();
162-
}
163-
was_leader = is_leader;
164+
};
164165

165-
let start_block = self_arc.update_caches(&mut last_block, is_leader).await;
166+
let start_block = self_arc
167+
.update_caches(&mut last_block, is_leader.unwrap_or(true))
168+
.await;
166169

167170
// caches are warmed up, we're ready to do leader work
168171
if let Some(startup) = self_arc.probes.startup.as_ref() {
169172
startup.store(true, Ordering::Release);
170173
}
171174

172-
if !is_leader {
175+
if stepped_up {
176+
tracing::info!("Caches warmed up, stepped up as a leader and ready");
177+
}
178+
179+
was_leader = is_leader;
180+
181+
if !is_leader.unwrap_or(true) {
173182
// only the leader is supposed to run the auctions
174183
tokio::time::sleep(Duration::from_millis(200)).await;
175184
continue;

crates/orderbook/src/run.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,12 @@ pub async fn run(args: Arguments) {
473473
let mut metrics_address = args.bind_address;
474474
metrics_address.set_port(DEFAULT_METRICS_PORT);
475475
tracing::info!(%metrics_address, "serving metrics");
476-
let metrics_task = serve_metrics(orderbook, metrics_address, Default::default(), Default::default());
476+
let metrics_task = serve_metrics(
477+
orderbook,
478+
metrics_address,
479+
Default::default(),
480+
Default::default(),
481+
);
477482

478483
futures::pin_mut!(serve_api);
479484
tokio::select! {

0 commit comments

Comments
 (0)