@@ -21,7 +21,6 @@ use crate::{
2121 protobuf:: drand:: { BeaconPacket , ChainInfoPacket , StartSyncRequest , SyncProgress } ,
2222 { debug, error, info, warn} ,
2323} ;
24- use energon:: traits:: Affine ;
2524use rand:: seq:: SliceRandom ;
2625use std:: time:: Duration ;
2726use tokio:: {
@@ -145,13 +144,16 @@ impl<S: Scheme, B: BeaconRepr> DefaultSyncer<S, B> {
145144 Ok ( syncer)
146145 }
147146
148- #[ rustfmt:: skip] // readability
147+ #[ rustfmt:: skip]
148+ #[ cfg( not( feature = "skip_db_verification" ) ) ]
149149 pub fn process_follow_request (
150150 self ,
151151 target : u64 ,
152152 tx : mpsc:: Sender < SyncProgressResponse > ,
153153 log : Logger ,
154154 ) -> JoinHandle < Result < ( ) , SyncError > > {
155+ use energon:: traits:: Affine ;
156+
155157 task:: spawn ( async move {
156158 let mut last_stored = self . store . last ( ) . await ?;
157159 if last_stored. round ( ) >= target {
@@ -173,7 +175,6 @@ impl<S: Scheme, B: BeaconRepr> DefaultSyncer<S, B> {
173175 error ! ( & log, "latest stored round {}, {err}" , last_stored. round( ) ) ;
174176 return Err ( err) ;
175177 }
176-
177178 let mut stream = match ProtocolClient :: new ( peer) . await {
178179 Ok ( mut client) => {
179180 match client. sync_chain ( from, self . info . beacon_id . to_string ( ) ) . await {
@@ -189,13 +190,11 @@ impl<S: Scheme, B: BeaconRepr> DefaultSyncer<S, B> {
189190 continue ;
190191 }
191192 } ;
192-
193193 while let Ok ( Some ( p) ) = stream. message ( ) . await {
194194 let Some ( ref meta) = p. metadata else {
195195 error ! ( & log, "stream: skipping {peer}, no metadata for round {}" , p. round) ;
196196 continue ' peers;
197197 } ;
198-
199198 if self . info . beacon_id . as_str ( ) != meta. beacon_id {
200199 error ! ( & log, "stream: skipping {peer}, invalid beacon_id {} for round {}" , meta. beacon_id, p. round) ;
201200 continue ' peers;
@@ -213,7 +212,6 @@ impl<S: Scheme, B: BeaconRepr> DefaultSyncer<S, B> {
213212 error ! ( & log, "stream: skipping peer {peer}: failed to deserialize signature for round {}" , p. round) ;
214213 continue ' peers;
215214 } ;
216-
217215 if super :: is_valid_signature :: < S > ( & self . info . public_key , last_stored. signature ( ) , p. round , & new_sig) {
218216 // Signature and round has been checked - beacon is valid.
219217 let valid_beacon = B :: from_packet ( p) ;
@@ -222,7 +220,6 @@ impl<S: Scheme, B: BeaconRepr> DefaultSyncer<S, B> {
222220 return Err ( SyncError :: ChainStore ( err) ) ;
223221 }
224222 last_stored = valid_beacon;
225-
226223 // Report sync progress to control client side.
227224 if tx. send ( Ok ( SyncProgress { current : last_stored. round ( ) , target, metadata : None } ) ) . await . is_err ( ) {
228225 debug ! ( & log, "sync request cancelled: synced {}, latest_stored {}" , last_stored. round( ) - started_from, last_stored. round( ) ) ;
@@ -238,12 +235,102 @@ impl<S: Scheme, B: BeaconRepr> DefaultSyncer<S, B> {
238235 }
239236 }
240237 }
241-
242238 if last_stored. round ( ) != target {
243239 let err = SyncError :: TriedAllPers {
244240 last : last_stored. round ( ) ,
245241 } ;
242+ let _ = tx. send ( Err ( Status :: cancelled ( err. to_string ( ) ) ) ) . await ;
243+ error ! ( & log, "sync request finished: {err}" ) ;
244+ return Err ( err) ;
245+ }
246+
247+ Ok ( ( ) )
248+ } )
249+ }
250+
251+ #[ rustfmt:: skip]
252+ #[ cfg( feature = "skip_db_verification" ) ]
253+ pub fn process_follow_request (
254+ self ,
255+ target : u64 ,
256+ tx : mpsc:: Sender < SyncProgressResponse > ,
257+ log : Logger ,
258+ ) -> JoinHandle < Result < ( ) , SyncError > > {
259+ task:: spawn ( async move {
260+ let mut last_stored = self . store . last ( ) . await ?;
261+ if last_stored. round ( ) >= target {
262+ warn ! ( & log, "sync request rejected: target {target}, latest_stored {}" , last_stored. round( ) ) ;
263+ return Ok ( ( ) ) ;
264+ }
265+ info ! ( & log, "processing sync request: target {target}, latest_stored {}" , last_stored. round( ) ) ;
266+ warn ! ( & log, "skipping beacons validity checks - use for tests only!" ) ;
267+
268+ let started_from = last_stored. round ( ) ;
269+ if target - started_from > LOGS_TO_SKIP {
270+ debug ! ( & log, "logging will use rate limiting {LOGS_TO_SKIP}" ) ;
271+ }
272+
273+ // Peers are randomly sorted on configuration step (see [start_follow_chain]).
274+ ' peers: for peer in & self . peers {
275+ let from = last_stored. round ( ) + 1 ;
276+ if target < from {
277+ let err = SyncError :: InvalidTarget { from, target } ;
278+ error ! ( & log, "latest stored round {}, {err}" , last_stored. round( ) ) ;
279+ return Err ( err) ;
280+ }
281+ let mut stream = match ProtocolClient :: new ( peer) . await {
282+ Ok ( mut client) => {
283+ match client. sync_chain ( from, self . info . beacon_id . to_string ( ) ) . await {
284+ Ok ( stream) => stream,
285+ Err ( err) => {
286+ error ! ( & log, "skipping {peer}, failed to get stream: {err}" ) ;
287+ continue ;
288+ }
289+ }
290+ }
291+ Err ( err) => {
292+ error ! ( & log, "skipping {peer}, unable to create client: {err}" , ) ;
293+ continue ;
294+ }
295+ } ;
296+ while let Ok ( Some ( p) ) = stream. message ( ) . await {
297+ let Some ( ref meta) = p. metadata else {
298+ error ! ( & log, "stream: skipping {peer}, no metadata for round {}" , p. round) ;
299+ continue ' peers;
300+ } ;
301+ if self . info . beacon_id . as_str ( ) != meta. beacon_id {
302+ error ! ( & log, "stream: skipping {peer}, invalid beacon_id {} for round {}" , meta. beacon_id, p. round) ;
303+ continue ' peers;
304+ }
305+ if p. round != last_stored. round ( ) + 1 {
306+ warn ! ( & log, "round expected {}, received {}, CONTINUE.." , last_stored. round( ) + 1 , p. round) ;
307+ }
308+ if target - p. round < LOGS_TO_SKIP || p. round % LOGS_TO_SKIP == 0 {
309+ debug ! ( & log, "new_beacon_fetched: peer {peer}, from_round {from}, got_round {}" , p. round) ;
310+ }
246311
312+ let unchecked_beacon = B :: from_packet ( p) ;
313+ if let Err ( err) = self . store . put ( unchecked_beacon. clone ( ) ) . await {
314+ error ! ( & log, "failed to store beacon for round {}: {err}" , unchecked_beacon. round( ) ) ;
315+ return Err ( SyncError :: ChainStore ( err) ) ;
316+ }
317+ last_stored = unchecked_beacon;
318+
319+ // Report sync progress to control client side.
320+ if tx. send ( Ok ( SyncProgress { current : last_stored. round ( ) , target, metadata : None } ) ) . await . is_err ( ) {
321+ debug ! ( & log, "sync request cancelled: synced {}, latest_stored {}" , last_stored. round( ) - started_from, last_stored. round( ) ) ;
322+ return Ok ( ( ) ) ;
323+ }
324+ if last_stored. round ( ) == target {
325+ debug ! ( & log, "finished syncing up_to {target} round" ) ;
326+ return Ok ( ( ) ) ;
327+ }
328+ }
329+ }
330+ if last_stored. round ( ) != target {
331+ let err = SyncError :: TriedAllPers {
332+ last : last_stored. round ( ) ,
333+ } ;
247334 let _ = tx. send ( Err ( Status :: cancelled ( err. to_string ( ) ) ) ) . await ;
248335 error ! ( & log, "sync request finished: {err}" ) ;
249336 return Err ( err) ;
0 commit comments