11use std:: ops:: RangeInclusive ;
2+ use std:: time:: Duration ;
23
34use apollo_config:: secrets:: Sensitive ;
45use async_trait:: async_trait;
@@ -15,11 +16,26 @@ pub mod cyclic_base_layer_wrapper_test;
1516#[ derive( Debug ) ]
1617pub struct CyclicBaseLayerWrapper < B : BaseLayerContract + Send + Sync > {
1718 base_layer : B ,
19+ retry_primary_interval : Duration ,
20+ last_primary_retry : tokio:: time:: Instant ,
1821}
1922
2023impl < B : BaseLayerContract + Send + Sync > CyclicBaseLayerWrapper < B > {
21- pub fn new ( base_layer : B ) -> Self {
22- Self { base_layer }
24+ pub fn new ( base_layer : B , retry_primary_interval : Duration ) -> Self {
25+ Self { base_layer, retry_primary_interval, last_primary_retry : tokio:: time:: Instant :: now ( ) }
26+ }
27+
28+ // Retries the primary endpoint once the interval has elapsed since we left it. Does nothing
29+ // while already on the primary, so the timer is untouched until a failover moves us off it.
30+ async fn retry_primary_if_due ( & mut self ) -> Result < ( ) , B :: Error > {
31+ if self . base_layer . is_at_primary ( ) . await ? {
32+ return Ok ( ( ) ) ;
33+ }
34+ if self . last_primary_retry . elapsed ( ) >= self . retry_primary_interval {
35+ self . last_primary_retry = tokio:: time:: Instant :: now ( ) ;
36+ self . base_layer . reset_provider_url_to_primary ( ) . await ?;
37+ }
38+ Ok ( ( ) )
2339 }
2440
2541 // Check the result of a function call to the base layer. If it fails, cycle the URL and signal
@@ -38,11 +54,21 @@ impl<B: BaseLayerContract + Send + Sync> CyclicBaseLayerWrapper<B> {
3854 let Ok ( current_url) = current_url_result else {
3955 return Some ( Err ( current_url_result. expect_err ( "result is checked at let-else" ) ) ) ;
4056 } ;
57+ // Record whether we are about to leave the primary, before cycling away from it.
58+ let is_at_primary_result = self . base_layer . is_at_primary ( ) . await ;
59+ let Ok ( was_at_primary) = is_at_primary_result else {
60+ return Some ( Err ( is_at_primary_result. expect_err ( "result is checked at let-else" ) ) ) ;
61+ } ;
4162 // Otherwise, cycle the URL so we can try again. Return error in case it fails to cycle.
4263 let cycle_url_result = self . base_layer . cycle_provider_url ( ) . await ;
4364 let Ok ( ( ) ) = cycle_url_result else {
4465 return Some ( Err ( cycle_url_result. expect_err ( "result is checked at let-else" ) ) ) ;
4566 } ;
67+ // Restart the retry-primary clock only when this failover leaves the primary, so the wait
68+ // is measured from when we left it; cycling between backups must not push the retry out.
69+ if was_at_primary {
70+ self . last_primary_retry = tokio:: time:: Instant :: now ( ) ;
71+ }
4672 // Get the new URL (return error in case it fails to get it).
4773 let new_url_result = self . base_layer . get_url ( ) . await ;
4874 let Ok ( new_url) = new_url_result else {
@@ -69,6 +95,7 @@ impl<B: BaseLayerContract + Send + Sync> BaseLayerContract for CyclicBaseLayerWr
6995 & mut self ,
7096 l1_block : L1BlockNumber ,
7197 ) -> Result < BlockHashAndNumber , Self :: Error > {
98+ self . retry_primary_if_due ( ) . await ?;
7299 let start_url = self . base_layer . get_url ( ) . await ?;
73100 loop {
74101 let result = self . base_layer . get_proved_block_at ( l1_block) . await ;
@@ -79,6 +106,7 @@ impl<B: BaseLayerContract + Send + Sync> BaseLayerContract for CyclicBaseLayerWr
79106 }
80107
81108 async fn latest_l1_block_number ( & mut self ) -> Result < L1BlockNumber , Self :: Error > {
109+ self . retry_primary_if_due ( ) . await ?;
82110 let start_url = self . base_layer . get_url ( ) . await ?;
83111 loop {
84112 let result = self . base_layer . latest_l1_block_number ( ) . await ;
@@ -92,6 +120,7 @@ impl<B: BaseLayerContract + Send + Sync> BaseLayerContract for CyclicBaseLayerWr
92120 & mut self ,
93121 block_number : L1BlockNumber ,
94122 ) -> Result < Option < L1BlockReference > , Self :: Error > {
123+ self . retry_primary_if_due ( ) . await ?;
95124 let start_url = self . base_layer . get_url ( ) . await ?;
96125 loop {
97126 let result = self . base_layer . l1_block_at ( block_number) . await ;
@@ -106,6 +135,7 @@ impl<B: BaseLayerContract + Send + Sync> BaseLayerContract for CyclicBaseLayerWr
106135 block_range : RangeInclusive < L1BlockNumber > ,
107136 event_identifiers : & ' a [ & ' a str ] ,
108137 ) -> Result < Vec < L1Event > , Self :: Error > {
138+ self . retry_primary_if_due ( ) . await ?;
109139 let start_url = self . base_layer . get_url ( ) . await ?;
110140 loop {
111141 let result = self . base_layer . events ( block_range. clone ( ) , event_identifiers) . await ;
@@ -119,6 +149,7 @@ impl<B: BaseLayerContract + Send + Sync> BaseLayerContract for CyclicBaseLayerWr
119149 & mut self ,
120150 block_number : L1BlockNumber ,
121151 ) -> Result < Option < L1BlockHeader > , Self :: Error > {
152+ self . retry_primary_if_due ( ) . await ?;
122153 let start_url = self . base_layer . get_url ( ) . await ?;
123154 loop {
124155 let result = self . base_layer . get_block_header ( block_number) . await ;
@@ -128,6 +159,8 @@ impl<B: BaseLayerContract + Send + Sync> BaseLayerContract for CyclicBaseLayerWr
128159 }
129160 }
130161
162+ // Takes &self so it cannot cycle or retry endpoints; callers needing resilience use the &mut
163+ // self methods.
131164 async fn get_block_header_immutable (
132165 & self ,
133166 block_number : L1BlockNumber ,
@@ -146,4 +179,12 @@ impl<B: BaseLayerContract + Send + Sync> BaseLayerContract for CyclicBaseLayerWr
146179 async fn cycle_provider_url ( & mut self ) -> Result < ( ) , Self :: Error > {
147180 self . base_layer . cycle_provider_url ( ) . await
148181 }
182+
183+ async fn reset_provider_url_to_primary ( & mut self ) -> Result < ( ) , Self :: Error > {
184+ self . base_layer . reset_provider_url_to_primary ( ) . await
185+ }
186+
187+ async fn is_at_primary ( & self ) -> Result < bool , Self :: Error > {
188+ self . base_layer . is_at_primary ( ) . await
189+ }
149190}
0 commit comments