Skip to content

Commit 3421f09

Browse files
mendesslbaquerofierro
authored andcommitted
Add wshim support (cloudflare#167)
1 parent 43b909d commit 3421f09

File tree

3 files changed

+130
-59
lines changed

3 files changed

+130
-59
lines changed

crates/generic_log_worker/src/lib.rs

Lines changed: 78 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ use std::cell::RefCell;
2828
use std::collections::btree_map::Entry;
2929
use std::collections::{BTreeMap, HashMap, VecDeque};
3030
use std::io::Write;
31+
use std::time::Duration;
3132
use tlog_tiles::{LookupKey, PendingLogEntry, SequenceMetadata};
3233
use tokio::sync::Mutex;
3334
use util::now_millis;
3435
use worker::{
35-
js_sys, kv, kv::KvStore, wasm_bindgen, Bucket, Env, Error, HttpMetadata, Result, State,
36+
js_sys, kv, kv::KvStore, wasm_bindgen, Bucket, Delay, Env, Error, HttpMetadata, Result, State,
3637
Storage, Stub,
3738
};
3839

@@ -513,6 +514,36 @@ impl LockBackend for State {
513514
}
514515
}
515516

517+
// R2 retry config: 3 retries with exponential backoff (100ms -> 200ms -> 400ms).
518+
pub const R2_MAX_RETRIES: u32 = 3;
519+
pub const R2_BASE_DELAY_MS: u64 = 100;
520+
521+
/// Retries an async operation with exponential backoff.
522+
///
523+
/// # Errors
524+
///
525+
/// Returns the last error if all retry attempts fail.
526+
pub async fn with_retry<T, F, Fut>(max_retries: u32, base_delay_ms: u64, operation: F) -> Result<T>
527+
where
528+
F: Fn() -> Fut,
529+
Fut: std::future::Future<Output = Result<T>>,
530+
{
531+
let mut last_error = None;
532+
for attempt in 0..=max_retries {
533+
match operation().await {
534+
Ok(result) => return Ok(result),
535+
Err(e) => {
536+
last_error = Some(e);
537+
if attempt < max_retries {
538+
let delay_ms = base_delay_ms * (1 << attempt);
539+
Delay::from(Duration::from_millis(delay_ms)).await;
540+
}
541+
}
542+
}
543+
}
544+
Err(last_error.expect("with_retry: at least one attempt should have been made"))
545+
}
546+
516547
pub trait ObjectBackend {
517548
/// Upload the object with the given key and data to the object backend,
518549
/// adding additional HTTP metadata headers based on the provided options.
@@ -561,31 +592,39 @@ impl ObjectBackend for ObjectBucket {
561592
opts: &UploadOptions,
562593
) -> Result<()> {
563594
let start = now_millis();
564-
let mut metadata = HttpMetadata::default();
565-
if let Some(content_type) = &opts.content_type {
566-
metadata.content_type = Some(content_type.to_string());
595+
let content_type = opts
596+
.content_type
597+
.clone()
598+
.unwrap_or_else(|| "application/octet-stream".into());
599+
let cache_control = if opts.immutable {
600+
"public, max-age=604800, immutable"
567601
} else {
568-
metadata.content_type = Some("application/octet-stream".into());
569-
}
570-
if opts.immutable {
571-
metadata.cache_control = Some("public, max-age=604800, immutable".into());
572-
} else {
573-
metadata.cache_control = Some("no-store".into());
574-
}
602+
"no-store"
603+
};
575604
let value: Vec<u8> = data.into();
605+
let key_str = key.as_ref();
576606
self.metrics
577607
.as_ref()
578608
.inspect(|&m| m.upload_size_bytes.observe(value.len().as_f64()));
579-
self.bucket
580-
.put(key.as_ref(), value.clone())
581-
.http_metadata(metadata)
582-
.execute()
583-
.await
584-
.inspect_err(|_| {
585-
self.metrics
586-
.as_ref()
587-
.inspect(|&m| m.errors.with_label_values(&["put"]).inc());
588-
})?;
609+
610+
with_retry(R2_MAX_RETRIES, R2_BASE_DELAY_MS, || async {
611+
let metadata = HttpMetadata {
612+
content_type: Some(content_type.clone()),
613+
cache_control: Some(cache_control.into()),
614+
..Default::default()
615+
};
616+
self.bucket
617+
.put(key_str, value.clone())
618+
.http_metadata(metadata)
619+
.execute()
620+
.await
621+
})
622+
.await
623+
.inspect_err(|_| {
624+
self.metrics
625+
.as_ref()
626+
.inspect(|&m| m.errors.with_label_values(&["put"]).inc());
627+
})?;
589628

590629
self.metrics.as_ref().inspect(|&m| {
591630
m.duration
@@ -598,20 +637,25 @@ impl ObjectBackend for ObjectBucket {
598637

599638
async fn fetch<S: AsRef<str>>(&self, key: S) -> Result<Option<Vec<u8>>> {
600639
let start = now_millis();
601-
let res = match self.bucket.get(key.as_ref()).execute().await? {
602-
Some(obj) => {
603-
let body = obj
604-
.body()
605-
.ok_or_else(|| format!("missing object body: {}", key.as_ref()))?;
606-
let bytes = body.bytes().await.inspect_err(|_| {
607-
self.metrics.as_ref().inspect(|&m| {
608-
m.errors.with_label_values(&["get"]).inc();
609-
});
610-
})?;
611-
Ok(Some(bytes))
640+
let key_str = key.as_ref();
641+
let res = with_retry(R2_MAX_RETRIES, R2_BASE_DELAY_MS, || async {
642+
match self.bucket.get(key_str).execute().await? {
643+
Some(obj) => {
644+
let body = obj
645+
.body()
646+
.ok_or_else(|| format!("missing object body: {}", key_str))?;
647+
let bytes = body.bytes().await?;
648+
Ok(Some(bytes))
649+
}
650+
None => Ok(None),
612651
}
613-
None => Ok(None),
614-
};
652+
})
653+
.await
654+
.inspect_err(|_| {
655+
self.metrics
656+
.as_ref()
657+
.inspect(|&m| m.errors.with_label_values(&["get"]).inc());
658+
});
615659
self.metrics.as_ref().inspect(|&m| {
616660
m.duration
617661
.with_label_values(&["get"])

crates/mtc_worker/src/frontend_worker.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use generic_log_worker::{
1515
obs::Wshim,
1616
serialize,
1717
util::now_millis,
18-
ObjectBackend, ObjectBucket, ENTRY_ENDPOINT,
18+
with_retry, ObjectBackend, ObjectBucket, R2_BASE_DELAY_MS, R2_MAX_RETRIES, ENTRY_ENDPOINT,
1919
};
2020
use mtc_api::{
2121
serialize_signatureless_cert, AddEntryRequest, AddEntryResponse, BootstrapMtcLogEntry,
@@ -235,7 +235,11 @@ async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
235235
// monitoring_url is unspecified.
236236
if CONFIG.logs[name].monitoring_url.is_empty() {
237237
let bucket = load_public_bucket(&ctx.env, name)?;
238-
if let Some(obj) = bucket.get(key).execute().await? {
238+
let obj_opt = with_retry(R2_MAX_RETRIES, R2_BASE_DELAY_MS, || async {
239+
bucket.get(key).execute().await
240+
})
241+
.await?;
242+
if let Some(obj) = obj_opt {
239243
Response::from_body(
240244
obj.body()
241245
.ok_or("R2 object missing body")?

crates/mtc_worker/src/sequencer_do.rs

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use crate::{load_checkpoint_cosigner, load_origin, CONFIG};
99
use generic_log_worker::{
1010
get_durable_object_name, load_public_bucket,
1111
log_ops::{prove_subtree_consistency, ProofError},
12-
CachedRoObjectBucket, CheckpointCallbacker, GenericSequencer, ObjectBucket, SequencerConfig,
13-
SEQUENCER_BINDING,
12+
with_retry, CachedRoObjectBucket, CheckpointCallbacker, GenericSequencer, ObjectBucket,
13+
SequencerConfig, R2_BASE_DELAY_MS, R2_MAX_RETRIES, SEQUENCER_BINDING,
1414
};
1515
use mtc_api::{
1616
BootstrapMtcLogEntry, LandmarkSequence, LANDMARK_BUNDLE_KEY, LANDMARK_CHECKPOINT_KEY,
@@ -132,28 +132,47 @@ fn checkpoint_callback(env: &Env, name: &str) -> CheckpointCallbacker {
132132
// https://github.com/cloudflare/workers-rs/issues/876
133133

134134
// Load current landmark sequence.
135-
let mut seq =
136-
if let Some(obj) = bucket_clone.get(LANDMARK_KEY).execute().await? {
137-
let bytes = obj.body().ok_or("missing object body")?.bytes().await?;
138-
LandmarkSequence::from_bytes(&bytes, max_landmarks)
139-
.map_err(|e| e.to_string())?
140-
} else {
141-
LandmarkSequence::create(max_landmarks)
142-
};
135+
let landmark_bytes: Option<Vec<u8>> =
136+
with_retry(R2_MAX_RETRIES, R2_BASE_DELAY_MS, || async {
137+
match bucket_clone.get(LANDMARK_KEY).execute().await? {
138+
Some(obj) => {
139+
let bytes =
140+
obj.body().ok_or("missing object body")?.bytes().await?;
141+
Ok(Some(bytes))
142+
}
143+
None => Ok(None),
144+
}
145+
})
146+
.await?;
147+
let mut seq = if let Some(bytes) = landmark_bytes {
148+
LandmarkSequence::from_bytes(&bytes, max_landmarks)
149+
.map_err(|e| e.to_string())?
150+
} else {
151+
LandmarkSequence::create(max_landmarks)
152+
};
153+
143154
// Add the new landmark.
144155
if seq.add(tree_size).map_err(|e| e.to_string())? {
145156
// The landmark sequence was updated. Publish the result.
146-
bucket_clone
147-
.put(LANDMARK_KEY, seq.to_bytes().map_err(|e| e.to_string())?)
148-
.execute()
149-
.await?;
157+
let seq_bytes = seq.to_bytes().map_err(|e| e.to_string())?;
158+
with_retry(R2_MAX_RETRIES, R2_BASE_DELAY_MS, || async {
159+
bucket_clone
160+
.put(LANDMARK_KEY, seq_bytes.clone())
161+
.execute()
162+
.await
163+
})
164+
.await?;
150165
}
151166

152167
// Update the landmark checkpoint.
153-
bucket_clone
154-
.put(LANDMARK_CHECKPOINT_KEY, new_checkpoint_str.clone())
155-
.execute()
156-
.await?;
168+
let checkpoint_str = new_checkpoint_str.clone();
169+
with_retry(R2_MAX_RETRIES, R2_BASE_DELAY_MS, || async {
170+
bucket_clone
171+
.put(LANDMARK_CHECKPOINT_KEY, checkpoint_str.clone())
172+
.execute()
173+
.await
174+
})
175+
.await?;
157176

158177
// Compute the landmark bundle and save it
159178
let subtrees =
@@ -164,11 +183,15 @@ fn checkpoint_callback(env: &Env, name: &str) -> CheckpointCallbacker {
164183
subtrees,
165184
landmarks: seq.landmarks,
166185
};
167-
bucket_clone
168-
// Can unwrap here because we use the autoderived Serialize impl for LandmarkBundle
169-
.put(LANDMARK_BUNDLE_KEY, serde_json::to_vec(&bundle).unwrap())
170-
.execute()
171-
.await?;
186+
// Can unwrap here because we use the autoderived Serialize impl for LandmarkBundle.
187+
let bundle_bytes = serde_json::to_vec(&bundle).unwrap();
188+
with_retry(R2_MAX_RETRIES, R2_BASE_DELAY_MS, || async {
189+
bucket_clone
190+
.put(LANDMARK_BUNDLE_KEY, bundle_bytes.clone())
191+
.execute()
192+
.await
193+
})
194+
.await?;
172195

173196
Ok(())
174197
}

0 commit comments

Comments
 (0)