Skip to content

Commit 7b40325

Browse files
committed
DedupCache: Reset head if delta too high
1 parent 2009cd0 commit 7b40325

File tree

1 file changed

+7
-2
lines changed
  • crates/generic_log_worker/src

1 file changed

+7
-2
lines changed

crates/generic_log_worker/src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub use log_ops::upload_issuers;
1717
pub use sequencer_do::*;
1818

1919
use byteorder::{BigEndian, WriteBytesExt};
20-
use log::{info, Level};
20+
use log::{error, info, Level};
2121
use log_ops::UploadOptions;
2222
use metrics::{millis_diff_as_secs, AsF64, ObjectMetrics};
2323
use serde::{Deserialize, Serialize};
@@ -259,7 +259,7 @@ impl DedupCache {
259259
// Load batches of cache entries from DO storage into the in-memory cache.
260260
async fn load(&self, name: &str) -> Result<()> {
261261
info!("{name} DedupCache: Loading head");
262-
let head = self
262+
let mut head = self
263263
.storage
264264
.get::<usize>(Self::FIFO_HEAD_KEY)
265265
.await
@@ -271,6 +271,11 @@ impl DedupCache {
271271
.await
272272
.unwrap_or_default();
273273
info!("{name} DedupCache: head({head}) and tail({tail}) loaded");
274+
if tail - head > Self::MAX_BATCHES {
275+
head = tail - Self::MAX_BATCHES;
276+
error!("{name} DedupCache: delta too high, setting head to tail - MAX_BATCHES ({head})");
277+
self.storage.put(Self::FIFO_HEAD_KEY, head).await?;
278+
}
274279
let keys = (0..(tail - head)).map(Self::fifo_key).collect::<Vec<_>>();
275280
info!("{name} DedupCache: Getting keys");
276281
let map = self.storage.get_multiple(keys).await?;

0 commit comments

Comments
 (0)