Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 78 additions & 34 deletions crates/generic_log_worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ use std::cell::RefCell;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::io::Write;
use std::time::Duration;
use tlog_tiles::{LookupKey, PendingLogEntry, SequenceMetadata};
use tokio::sync::Mutex;
use util::now_millis;
use worker::{
js_sys, kv, kv::KvStore, wasm_bindgen, Bucket, Env, Error, HttpMetadata, Result, State,
js_sys, kv, kv::KvStore, wasm_bindgen, Bucket, Delay, Env, Error, HttpMetadata, Result, State,
Storage, Stub,
};

Expand Down Expand Up @@ -513,6 +514,36 @@ impl LockBackend for State {
}
}

// R2 retry config: 3 retries with exponential backoff (100ms -> 200ms -> 400ms).
pub const R2_MAX_RETRIES: u32 = 3;
pub const R2_BASE_DELAY_MS: u64 = 100;

/// Retries an async operation with exponential backoff.
///
/// # Errors
///
/// Returns the last error if all retry attempts fail.
pub async fn with_retry<T, F, Fut>(max_retries: u32, base_delay_ms: u64, operation: F) -> Result<T>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
let mut last_error = None;
for attempt in 0..=max_retries {
match operation().await {
Ok(result) => return Ok(result),
Err(e) => {
last_error = Some(e);
if attempt < max_retries {
let delay_ms = base_delay_ms * (1 << attempt);
Delay::from(Duration::from_millis(delay_ms)).await;
}
}
}
}
Err(last_error.expect("with_retry: at least one attempt should have been made"))
}

pub trait ObjectBackend {
/// Upload the object with the given key and data to the object backend,
/// adding additional HTTP metadata headers based on the provided options.
Expand Down Expand Up @@ -561,31 +592,39 @@ impl ObjectBackend for ObjectBucket {
opts: &UploadOptions,
) -> Result<()> {
let start = now_millis();
let mut metadata = HttpMetadata::default();
if let Some(content_type) = &opts.content_type {
metadata.content_type = Some(content_type.to_string());
let content_type = opts
.content_type
.clone()
.unwrap_or_else(|| "application/octet-stream".into());
let cache_control = if opts.immutable {
"public, max-age=604800, immutable"
} else {
metadata.content_type = Some("application/octet-stream".into());
}
if opts.immutable {
metadata.cache_control = Some("public, max-age=604800, immutable".into());
} else {
metadata.cache_control = Some("no-store".into());
}
"no-store"
};
let value: Vec<u8> = data.into();
let key_str = key.as_ref();
self.metrics
.as_ref()
.inspect(|&m| m.upload_size_bytes.observe(value.len().as_f64()));
self.bucket
.put(key.as_ref(), value.clone())
.http_metadata(metadata)
.execute()
.await
.inspect_err(|_| {
self.metrics
.as_ref()
.inspect(|&m| m.errors.with_label_values(&["put"]).inc());
})?;

with_retry(R2_MAX_RETRIES, R2_BASE_DELAY_MS, || async {
let metadata = HttpMetadata {
content_type: Some(content_type.clone()),
cache_control: Some(cache_control.into()),
..Default::default()
};
self.bucket
.put(key_str, value.clone())
.http_metadata(metadata)
.execute()
.await
})
.await
.inspect_err(|_| {
self.metrics
.as_ref()
.inspect(|&m| m.errors.with_label_values(&["put"]).inc());
})?;

self.metrics.as_ref().inspect(|&m| {
m.duration
Expand All @@ -598,20 +637,25 @@ impl ObjectBackend for ObjectBucket {

async fn fetch<S: AsRef<str>>(&self, key: S) -> Result<Option<Vec<u8>>> {
let start = now_millis();
let res = match self.bucket.get(key.as_ref()).execute().await? {
Some(obj) => {
let body = obj
.body()
.ok_or_else(|| format!("missing object body: {}", key.as_ref()))?;
let bytes = body.bytes().await.inspect_err(|_| {
self.metrics.as_ref().inspect(|&m| {
m.errors.with_label_values(&["get"]).inc();
});
})?;
Ok(Some(bytes))
let key_str = key.as_ref();
let res = with_retry(R2_MAX_RETRIES, R2_BASE_DELAY_MS, || async {
match self.bucket.get(key_str).execute().await? {
Some(obj) => {
let body = obj
.body()
.ok_or_else(|| format!("missing object body: {}", key_str))?;
let bytes = body.bytes().await?;
Ok(Some(bytes))
}
None => Ok(None),
}
None => Ok(None),
};
})
.await
.inspect_err(|_| {
self.metrics
.as_ref()
.inspect(|&m| m.errors.with_label_values(&["get"]).inc());
});
self.metrics.as_ref().inspect(|&m| {
m.duration
.with_label_values(&["get"])
Expand Down
69 changes: 46 additions & 23 deletions crates/mtc_worker/src/sequencer_do.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::{load_checkpoint_cosigner, load_origin, CONFIG};
use generic_log_worker::{
get_durable_object_name, load_public_bucket,
log_ops::{prove_subtree_consistency, ProofError},
CachedRoObjectBucket, CheckpointCallbacker, GenericSequencer, ObjectBucket, SequencerConfig,
SEQUENCER_BINDING,
with_retry, CachedRoObjectBucket, CheckpointCallbacker, GenericSequencer, ObjectBucket,
SequencerConfig, R2_BASE_DELAY_MS, R2_MAX_RETRIES, SEQUENCER_BINDING,
};
use mtc_api::{
BootstrapMtcLogEntry, LandmarkSequence, LANDMARK_BUNDLE_KEY, LANDMARK_CHECKPOINT_KEY,
Expand Down Expand Up @@ -132,28 +132,47 @@ fn checkpoint_callback(env: &Env, name: &str) -> CheckpointCallbacker {
// https://github.com/cloudflare/workers-rs/issues/876

// Load current landmark sequence.
let mut seq =
if let Some(obj) = bucket_clone.get(LANDMARK_KEY).execute().await? {
let bytes = obj.body().ok_or("missing object body")?.bytes().await?;
LandmarkSequence::from_bytes(&bytes, max_landmarks)
.map_err(|e| e.to_string())?
} else {
LandmarkSequence::create(max_landmarks)
};
let landmark_bytes: Option<Vec<u8>> =
with_retry(R2_MAX_RETRIES, R2_BASE_DELAY_MS, || async {
match bucket_clone.get(LANDMARK_KEY).execute().await? {
Some(obj) => {
let bytes =
obj.body().ok_or("missing object body")?.bytes().await?;
Ok(Some(bytes))
}
None => Ok(None),
}
})
.await?;
let mut seq = if let Some(bytes) = landmark_bytes {
LandmarkSequence::from_bytes(&bytes, max_landmarks)
.map_err(|e| e.to_string())?
} else {
LandmarkSequence::create(max_landmarks)
};

// Add the new landmark.
if seq.add(tree_size).map_err(|e| e.to_string())? {
// The landmark sequence was updated. Publish the result.
bucket_clone
.put(LANDMARK_KEY, seq.to_bytes().map_err(|e| e.to_string())?)
.execute()
.await?;
let seq_bytes = seq.to_bytes().map_err(|e| e.to_string())?;
with_retry(R2_MAX_RETRIES, R2_BASE_DELAY_MS, || async {
bucket_clone
.put(LANDMARK_KEY, seq_bytes.clone())
.execute()
.await
})
.await?;
}

// Update the landmark checkpoint.
bucket_clone
.put(LANDMARK_CHECKPOINT_KEY, new_checkpoint_str.clone())
.execute()
.await?;
let checkpoint_str = new_checkpoint_str.clone();
with_retry(R2_MAX_RETRIES, R2_BASE_DELAY_MS, || async {
bucket_clone
.put(LANDMARK_CHECKPOINT_KEY, checkpoint_str.clone())
.execute()
.await
})
.await?;

// Compute the landmark bundle and save it
let subtrees =
Expand All @@ -164,11 +183,15 @@ fn checkpoint_callback(env: &Env, name: &str) -> CheckpointCallbacker {
subtrees,
landmarks: seq.landmarks,
};
bucket_clone
// Can unwrap here because we use the autoderived Serialize impl for LandmarkBundle
.put(LANDMARK_BUNDLE_KEY, serde_json::to_vec(&bundle).unwrap())
.execute()
.await?;
// Can unwrap here because we use the autoderived Serialize impl for LandmarkBundle.
let bundle_bytes = serde_json::to_vec(&bundle).unwrap();
with_retry(R2_MAX_RETRIES, R2_BASE_DELAY_MS, || async {
bucket_clone
.put(LANDMARK_BUNDLE_KEY, bundle_bytes.clone())
.execute()
.await
})
.await?;

Ok(())
}
Expand Down