Skip to content

Commit fe6b7a4

Browse files
committed
Update to worker 0.6.0, addressing breaking changes
- Durable Object methods no longer take a mutable reference, so we need to use interior mutability (RefCell) to update state, as recommended by the workers-rs maintainers. The tricky part is avoiding passing a borrowed Ref to an async function, as that could cause a panic if, for example, a concurrent async operation attempts to borrow an already-mutably-borrowed value. - To simplify the above change, remove the Option wrapper from the GenericSequencer's 'sequence_state' field. Instead of loading the sequencer state just-in-time for the first sequencing, load it during the DO initialization (which happens after the first fetch or alarm). Similarly, when there is a fatal sequencing error requiring reloading the sequence state, load the state right away rather than waiting for the next sequencing.
1 parent 9460660 commit fe6b7a4

File tree

13 files changed

+256
-358
lines changed

13 files changed

+256
-358
lines changed

Cargo.lock

Lines changed: 14 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ thiserror = "2.0"
6868
tlog_tiles = { path = "crates/tlog_tiles", version = "0.2.0" }
6969
tokio = { version = "1", features = ["sync"] }
7070
url = "2.2"
71-
worker = "0.5.0"
71+
worker = "0.6.0"
7272
x509-cert = "0.2.5"
7373
x509-verify = { version = "0.4.4", features = [
7474
"md2",

crates/ct_worker/src/batcher_do.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use worker::*;
77
#[durable_object]
88
struct Batcher(GenericBatcher<StaticCTPendingLogEntry>);
99

10-
#[durable_object]
1110
impl DurableObject for Batcher {
1211
fn new(state: State, env: Env) -> Self {
1312
// Find the Durable Object name by enumerating all possibilities.
@@ -52,7 +51,7 @@ impl DurableObject for Batcher {
5251
Batcher(GenericBatcher::new(config, kv, sequencer))
5352
}
5453

55-
async fn fetch(&mut self, req: Request) -> Result<Response> {
54+
async fn fetch(&self, req: Request) -> Result<Response> {
5655
self.0.fetch(req).await
5756
}
5857
}

crates/ct_worker/src/frontend_worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ fn batcher_id_from_lookup_key(key: &LookupKey, num_batchers: u8) -> u8 {
300300
}
301301

302302
fn headers_from_http_metadata(meta: HttpMetadata) -> Headers {
303-
let mut h = Headers::new();
303+
let h = Headers::new();
304304
if let Some(hdr) = meta.cache_control {
305305
h.append("Cache-Control", &hdr).unwrap();
306306
}

crates/ct_worker/src/sequencer_do.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use worker::*;
1717
#[durable_object]
1818
struct Sequencer(GenericSequencer<StaticCTLogEntry>);
1919

20-
#[durable_object]
2120
impl DurableObject for Sequencer {
2221
fn new(state: State, env: Env) -> Self {
2322
// Find the Durable Object name by enumerating all possibilities.
@@ -77,11 +76,11 @@ impl DurableObject for Sequencer {
7776
Sequencer(GenericSequencer::new(config, state, bucket, registry))
7877
}
7978

80-
async fn fetch(&mut self, req: Request) -> Result<Response> {
79+
async fn fetch(&self, req: Request) -> Result<Response> {
8180
self.0.fetch(req).await
8281
}
8382

84-
async fn alarm(&mut self) -> Result<Response> {
83+
async fn alarm(&self) -> Result<Response> {
8584
self.0.alarm().await
8685
}
8786
}

crates/generic_log_worker/src/batcher_do.rs

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::{LookupKey, SequenceMetadata, BATCH_ENDPOINT, ENTRY_ENDPOINT};
1010
use base64::prelude::*;
1111
use futures_util::future::{join_all, select, Either};
1212
use std::{
13+
cell::RefCell,
1314
collections::{HashMap, HashSet},
1415
time::Duration,
1516
};
@@ -23,9 +24,9 @@ pub struct GenericBatcher<E: PendingLogEntry> {
2324
config: BatcherConfig,
2425
kv: Option<KvStore>,
2526
sequencer: Stub,
26-
batch: Batch<E>,
27-
in_flight: usize,
28-
processed: usize,
27+
batch: RefCell<Batch<E>>,
28+
in_flight: RefCell<usize>,
29+
processed: RefCell<usize>,
2930
}
3031

3132
pub struct BatcherConfig {
@@ -59,9 +60,9 @@ impl<E: PendingLogEntry> GenericBatcher<E> {
5960
config,
6061
kv,
6162
sequencer,
62-
batch: Batch::default(),
63-
in_flight: 0,
64-
processed: 0,
63+
batch: RefCell::new(Batch::default()),
64+
in_flight: RefCell::new(0),
65+
processed: RefCell::new(0),
6566
}
6667
}
6768

@@ -73,26 +74,26 @@ impl<E: PendingLogEntry> GenericBatcher<E> {
7374
///
7475
/// Returns an error if the request cannot be parsed or response cannot be
7576
/// constructed.
76-
pub async fn fetch(&mut self, mut req: Request) -> Result<Response> {
77+
pub async fn fetch(&self, mut req: Request) -> Result<Response> {
7778
match req.path().as_str() {
7879
ENTRY_ENDPOINT => {
7980
let entry: E = req.json().await?;
8081
let key = entry.lookup_key();
8182

82-
self.in_flight += 1;
83-
self.processed += 1;
83+
*self.in_flight.borrow_mut() += 1;
84+
*self.processed.borrow_mut() += 1;
8485

8586
// Add entry to the current pending batch if it isn't already present.
8687
// Rely on the Sequencer to deduplicate entries across batches.
87-
if !self.batch.by_hash.contains(&key) {
88-
self.batch.by_hash.insert(key);
89-
self.batch.entries.push(entry);
88+
if !self.batch.borrow().by_hash.contains(&key) {
89+
self.batch.borrow_mut().by_hash.insert(key);
90+
self.batch.borrow_mut().entries.push(entry);
9091
}
9192

92-
let mut recv = self.batch.done.subscribe();
93+
let mut recv = self.batch.borrow().done.subscribe();
9394

9495
// Submit the current pending batch if it's full.
95-
if self.batch.entries.len() >= self.config.max_batch_entries {
96+
if self.batch.borrow().entries.len() >= self.config.max_batch_entries {
9697
if let Err(e) = self.submit_batch().await {
9798
log::warn!("{} failed to submit full batch: {e}", self.config.name);
9899
}
@@ -112,7 +113,7 @@ impl<E: PendingLogEntry> GenericBatcher<E> {
112113
}
113114
Either::Right(((), batch_done)) => {
114115
// Batch timeout reached; submit this entry's batch if no-one has already.
115-
if self.batch.by_hash.contains(&key) {
116+
if self.batch.borrow().by_hash.contains(&key) {
116117
if let Err(e) = self.submit_batch().await {
117118
log::warn!(
118119
"{} failed to submit timed-out batch: {e}",
@@ -140,7 +141,7 @@ impl<E: PendingLogEntry> GenericBatcher<E> {
140141
// submitting the batch or rate limiting at the Sequencer.
141142
Response::error("rate limited", 429)
142143
};
143-
self.in_flight -= 1;
144+
*self.in_flight.borrow_mut() -= 1;
144145

145146
resp
146147
}
@@ -156,16 +157,16 @@ impl<E: PendingLogEntry> GenericBatcher<E> {
156157
///
157158
/// Returns an error if there are issues constructing or sending requests to
158159
/// the sequencer or deduplication cache.
159-
pub async fn submit_batch(&mut self) -> Result<()> {
160+
pub async fn submit_batch(&self) -> Result<()> {
160161
// Take the current pending batch and replace it with a new one.
161-
let batch = std::mem::take(&mut self.batch);
162+
let batch = std::mem::take(&mut *self.batch.borrow_mut());
162163

163164
log::debug!(
164165
"{} submitting batch: leaves={} inflight={} processed={}",
165166
self.config.name,
166167
batch.entries.len(),
167-
self.in_flight,
168-
self.processed,
168+
self.in_flight.borrow(),
169+
self.processed.borrow(),
169170
);
170171

171172
// Submit the batch, and wait for it to be sequenced.

crates/generic_log_worker/src/lib.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use log_ops::UploadOptions;
1919
use metrics::{millis_diff_as_secs, AsF64, ObjectMetrics};
2020
use serde_bytes::ByteBuf;
2121
use sha2::{Digest, Sha256};
22+
use std::cell::RefCell;
2223
use std::collections::{HashMap, VecDeque};
2324
use std::io::Write;
2425
use std::str::FromStr;
@@ -148,7 +149,7 @@ pub async fn put_cache_entry_metadata<L: PendingLogEntry>(
148149

149150
trait CacheWrite {
150151
/// Put the provided sequenced entries into the cache. This does NOT overwrite existing entries.
151-
async fn put_entries(&mut self, entries: &[(LookupKey, SequenceMetadata)]) -> Result<()>;
152+
async fn put_entries(&self, entries: &[(LookupKey, SequenceMetadata)]) -> Result<()>;
152153
}
153154

154155
trait CacheRead {
@@ -163,7 +164,7 @@ struct DedupCache {
163164

164165
impl CacheWrite for DedupCache {
165166
/// Write entries to both the short-term deduplication cache and its backup in DO Storage.
166-
async fn put_entries(&mut self, entries: &[(LookupKey, SequenceMetadata)]) -> Result<()> {
167+
async fn put_entries(&self, entries: &[(LookupKey, SequenceMetadata)]) -> Result<()> {
167168
if entries.is_empty() {
168169
return Ok(());
169170
}
@@ -194,7 +195,7 @@ impl DedupCache {
194195
}
195196

196197
// Load batches of cache entries from DO storage into the in-memory cache.
197-
async fn load(&mut self) -> Result<()> {
198+
async fn load(&self) -> Result<()> {
198199
let head = self
199200
.storage
200201
.get::<usize>(Self::FIFO_HEAD_KEY)
@@ -216,7 +217,7 @@ impl DedupCache {
216217
}
217218

218219
// Store a batch of cache entries in DO storage.
219-
async fn store(&mut self, entries: &[(LookupKey, SequenceMetadata)]) -> Result<()> {
220+
async fn store(&self, entries: &[(LookupKey, SequenceMetadata)]) -> Result<()> {
220221
let head = self
221222
.storage
222223
.get::<usize>(Self::FIFO_HEAD_KEY)
@@ -272,38 +273,40 @@ fn deserialize_entries(buf: &[u8]) -> Result<Vec<(LookupKey, SequenceMetadata)>>
272273
// A fixed-size in-memory FIFO cache.
273274
struct MemoryCache {
274275
max_size: usize,
275-
map: HashMap<LookupKey, SequenceMetadata>,
276-
fifo: VecDeque<LookupKey>,
276+
map: RefCell<HashMap<LookupKey, SequenceMetadata>>,
277+
fifo: RefCell<VecDeque<LookupKey>>,
277278
}
278279

279280
impl MemoryCache {
280281
fn new(max_size: usize) -> Self {
281282
assert_ne!(max_size, 0);
282283
Self {
283284
max_size,
284-
fifo: VecDeque::with_capacity(max_size),
285-
map: HashMap::with_capacity(max_size),
285+
fifo: RefCell::new(VecDeque::with_capacity(max_size)),
286+
map: RefCell::new(HashMap::with_capacity(max_size)),
286287
}
287288
}
288289

289290
// Get an entry from the in-memory cache.
290291
fn get_entry(&self, key: &LookupKey) -> Option<SequenceMetadata> {
291-
self.map.get(key).copied()
292+
self.map.borrow().get(key).copied()
292293
}
293294

294295
// Put a batch of entries into the in-memory cache,
295296
// evicting old entries to make room if necessary.
296-
fn put_entries(&mut self, entries: &[(LookupKey, SequenceMetadata)]) {
297+
fn put_entries(&self, entries: &[(LookupKey, SequenceMetadata)]) {
298+
let mut map = self.map.borrow_mut();
299+
let mut fifo = self.fifo.borrow_mut();
297300
for (key, value) in entries {
298-
if self.map.contains_key(key) {
301+
if map.contains_key(key) {
299302
continue;
300303
}
301-
if self.map.len() == self.max_size {
304+
if map.len() == self.max_size {
302305
// Evict oldest entry to make room.
303-
self.map.remove(&self.fifo.pop_front().unwrap());
306+
map.remove(&fifo.pop_front().unwrap());
304307
}
305-
self.fifo.push_back(*key);
306-
self.map.insert(*key, *value);
308+
fifo.push_back(*key);
309+
map.insert(*key, *value);
307310
}
308311
}
309312
}

0 commit comments

Comments
 (0)