@@ -19,6 +19,8 @@ use std::sync::{Arc, Mutex};
1919use std:: time:: Instant ;
2020use tokio:: task:: JoinHandle ;
2121
22+ const MAX_MSGS_TOTAL_RECOVERY_FAIL : usize = 75 ;
23+
2224pub async fn process_expired_epoch (
2325 conn : Arc < Mutex < DBConnection > > ,
2426 epoch_config : & EpochConfig ,
@@ -203,14 +205,25 @@ fn process_one_layer(
203205 if msgs. is_empty ( ) {
204206 continue ;
205207 }
206- msgs_len += msgs. len ( ) as i64 ;
207-
208- let msgs_len = msgs. len ( ) ;
209- let MsgRecoveryInfo { measurement, next_layer_messages, error_count } = recover_msgs ( msgs, & key) . map_err ( |e| {
210- debug ! ( "failed to recover {msgs_len} messages for threshold {threshold} on id {id} for tag {}: {e}" , hex:: encode( msg_tag) ) ;
211- e
212- } ) ?;
208+ let threshold_msgs_len = msgs. len ( ) ;
209+
210+ let MsgRecoveryInfo {
211+ measurement,
212+ next_layer_messages,
213+ error_count,
214+ } = match recover_msgs ( msgs, & key) {
215+ Ok ( info) => info,
216+ Err ( e) => {
217+ debug ! ( "failed to recover {threshold_msgs_len} messages for threshold {threshold} on id {id} for tag {}: {e}" , hex:: encode( msg_tag) ) ;
218+ if threshold_msgs_len <= MAX_MSGS_TOTAL_RECOVERY_FAIL {
219+ total_error_count += threshold_msgs_len;
220+ continue ;
221+ }
222+ return Err ( e. into ( ) ) ;
223+ }
224+ } ;
213225
226+ msgs_len += threshold_msgs_len as i64 ;
214227 metric_name = Some ( measurement. 0 ) ;
215228 metric_value = Some ( measurement. 1 ) ;
216229
@@ -227,6 +240,10 @@ fn process_one_layer(
227240 }
228241 }
229242
243+ if msgs_len == 0 {
244+ continue ;
245+ }
246+
230247 // create or update recovered msg with new count
231248 if let Some ( rec_msg) = existing_rec_msg {
232249 rec_msg. count += msgs_len;
0 commit comments