Skip to content

Commit 3a232c6

Browse files
committed
WIP add partial tile cleanup utility
The tlog-tiles and static-ct-api specs allow partial tiles to be deleted when the corresponding full tile is available. This helps to reduce R2 storage costs, but will incur extra cost for the R2 list and delete operations. - Add a `Cleaner` Durable Object, which iterates over a log and for each full tile available in the public bucket, lists and deletes any corresponding partial tiles. - Track subrequests to prevent an alarm from exceeding the 1000 subrequests limit per invocation. * (bonus) Lays the groundwork for implementing a tlog-witness or tlog-mirror as a service that periodically updates based on a target log's latest checkpoint. TODO - Rebase after `worker 0.6.1+` is released so that `delete_multiple` will work. - Hook up Cleaner DO into CT and MTC applications.
1 parent 21b429f commit 3a232c6

File tree

4 files changed

+284
-1
lines changed

4 files changed

+284
-1
lines changed

crates/ct_worker/wrangler.jsonc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,12 @@
5555
"Batcher"
5656
]
5757
}
58-
]
58+
],
59+
"triggers": {
60+
"crons": [
61+
"* * * * *"
62+
]
63+
}
5964
},
6065
"cftest": {
6166
"build": {
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
// Copyright (c) 2025 Cloudflare, Inc.
2+
// Licensed under the BSD-3-Clause license found in the LICENSE file or at https://opensource.org/licenses/BSD-3-Clause
3+
4+
//! Cleaner removes no-longer-needed partial tiles from the object backend.
5+
6+
use futures_util::future::try_join_all;
7+
use signed_note::{KeyName, VerifierList};
8+
use std::{cell::RefCell, mem, time::Duration};
9+
use tlog_tiles::{PathElem, TlogTile};
10+
use worker::{Bucket, Error as WorkerError, Object, Request, Response, Storage};
11+
12+
use crate::{log_ops::CHECKPOINT_KEY, util::now_millis};
13+
14+
// Workers are limited to 1000 subrequests per invocation (including R2 operations).
15+
// For each log, we'll need to perform the following subrequests:
16+
// - Get old and new log sizes (2 ops)
17+
// - List partials for full tree, data, and (optional) aux tiles (2-3 ops per 256 entries, plus logarithmic level-1+ tree tiles)
18+
// - Delete partials for full tree, data, and (optional) aux tiles (0-3 ops per 256 entries, after <https://github.com/cloudflare/workers-rs/issues/780>)
19+
// - Save new tree size (1 op)
20+
// We track subrequest to avoid going over the limit, but can still limit the range of entries.
21+
const SUBREQUEST_LIMIT: usize = 1000;
22+
// Up to 1000 objects can be delete from an R2 bucket in a single call.
23+
// <https://developers.cloudflare.com/r2/api/workers/workers-api-reference/#bucket-method-definitions>
24+
const MAX_DELETE_BATCH: usize = 1000;
25+
const STEP: u64 = TlogTile::FULL_WIDTH as u64;
26+
const CLEANED_SIZE_KEY: &str = "cleaned_size";
27+
const CURRENT_SIZE_KEY: &str = "current_size";
28+
29+
pub struct CleanerConfig {
30+
pub name: String,
31+
pub origin: KeyName,
32+
pub data_path: PathElem,
33+
pub aux_path: Option<PathElem>,
34+
pub verifiers: VerifierList,
35+
pub clean_interval: Duration,
36+
}
37+
38+
pub struct Cleaner {
39+
config: CleanerConfig,
40+
storage: Storage,
41+
bucket: Bucket,
42+
cleaned_size: RefCell<u64>,
43+
current_size: RefCell<u64>,
44+
subrequests: RefCell<usize>,
45+
initialized: RefCell<bool>,
46+
}
47+
48+
impl Cleaner {
49+
/// Return a new partial tile cleaner.
50+
pub fn new(config: CleanerConfig, storage: Storage, bucket: Bucket) -> Self {
51+
Self {
52+
storage,
53+
config,
54+
bucket,
55+
cleaned_size: RefCell::new(0),
56+
current_size: RefCell::new(0),
57+
subrequests: RefCell::new(0),
58+
initialized: RefCell::new(false),
59+
}
60+
}
61+
62+
/// Initialize the partial tile cleaner by loading a previously-saved
63+
/// `cleaned_size` and starting an alarm that will trigger every
64+
/// `clean_interval` and attempt to do log cleanup.
65+
///
66+
/// # Errors
67+
/// Will return an error if the alarm cannot be initialized.
68+
async fn initialize(&self) -> Result<(), WorkerError> {
69+
// Start the cleaner loop (OK if alarm is already set).
70+
self.storage.set_alarm(self.config.clean_interval).await?;
71+
72+
// Load the cleaned size, if it has been previously saved.
73+
if let Ok(cleaned_size) = self.storage.get::<u64>(CLEANED_SIZE_KEY).await {
74+
*self.cleaned_size.borrow_mut() = cleaned_size;
75+
}
76+
77+
// Load the current log size, if it has been previously saved.
78+
if let Ok(current_size) = self.storage.get::<u64>(CURRENT_SIZE_KEY).await {
79+
*self.current_size.borrow_mut() = current_size;
80+
}
81+
82+
*self.initialized.borrow_mut() = true;
83+
84+
Ok(())
85+
}
86+
87+
/// Fetch handler for the partial tile cleaner. This should only ever need
88+
/// to be called once to trigger initial alarm creation (but OK to call
89+
/// multiple times).
90+
///
91+
/// # Errors
92+
/// Will return an error if initialization fails.
93+
pub async fn fetch(&self, _req: Request) -> Result<Response, WorkerError> {
94+
if !*self.initialized.borrow() {
95+
self.initialize().await?;
96+
}
97+
Response::ok("Started cleaner")
98+
}
99+
100+
/// Alarm handler for the partial tile cleaner. This runs in a loop
101+
/// iterating over the log contents and removing partial tiles whose
102+
/// corresponding full tiles are already available.
103+
///
104+
/// # Errors
105+
/// Will return an error if initialization or cleaning fails.
106+
pub async fn alarm(&self) -> Result<Response, WorkerError> {
107+
// Reset the subrequest count.
108+
*self.subrequests.borrow_mut() = 0;
109+
110+
let name = &self.config.name;
111+
if !*self.initialized.borrow() {
112+
log::info!("{name}: Initializing cleaner from alarm handler",);
113+
self.initialize().await?;
114+
}
115+
// Schedule the next cleaning.
116+
self.storage.set_alarm(self.config.clean_interval).await?;
117+
118+
// Clean the log.
119+
if let Err(e) = self.clean_log().await {
120+
log::warn!("{name}: Error cleaning log: {e}");
121+
}
122+
123+
Response::ok("Alarm done")
124+
}
125+
126+
// Clean up partial tiles from a log, stopping either when the current log
127+
// size is reached or the subrequest limit is reached. After each deletion
128+
// operation, save the new cleaned size to durable storage.
129+
async fn clean_log(&self) -> Result<u64, WorkerError> {
130+
// Update the current log size if we're caught up to the previous
131+
// current log size.
132+
if *self.current_size.borrow() < *self.cleaned_size.borrow() + STEP {
133+
let new_size = self.current_size().await?;
134+
*self.current_size.borrow_mut() = new_size;
135+
self.storage.put(CURRENT_SIZE_KEY, new_size).await?;
136+
}
137+
138+
// Reserve subrequest to delete the final batch.
139+
self.checked_add_subrequests(1)?;
140+
141+
let mut pending_cleaned_size = *self.cleaned_size.borrow();
142+
let mut batch: Vec<String> = Vec::with_capacity(MAX_DELETE_BATCH);
143+
144+
while pending_cleaned_size + STEP <= *self.current_size.borrow() {
145+
match self
146+
.clean_partials_in_range(pending_cleaned_size, &mut batch)
147+
.await
148+
{
149+
Ok(()) => (),
150+
Err(e) => {
151+
// Do not return the error
152+
log::warn!("{}: failed to clean range: {e}", self.config.name);
153+
break;
154+
}
155+
}
156+
157+
pending_cleaned_size += STEP;
158+
}
159+
160+
// Delete final batch (using reserved subreqeust).
161+
if !batch.is_empty() {
162+
self.bucket.delete_multiple(batch).await?;
163+
}
164+
165+
// Save progress after deleting final batch.
166+
*self.cleaned_size.borrow_mut() = pending_cleaned_size;
167+
self.storage
168+
.put(CLEANED_SIZE_KEY, pending_cleaned_size)
169+
.await?;
170+
171+
Ok(pending_cleaned_size)
172+
}
173+
174+
// List and queue for deletion all partial tiles corresponding to full tree
175+
// tiles that would be generated between the old and new tree sizes.
176+
async fn clean_partials_in_range(
177+
&self,
178+
pending_cleaned_size: u64,
179+
batch: &mut Vec<String>,
180+
) -> Result<(), WorkerError> {
181+
let mut prefixes = Vec::new();
182+
for tile in TlogTile::new_tiles(pending_cleaned_size, pending_cleaned_size + STEP) {
183+
// Full tiles only. If the full tile exists, the corresponding partial tiles can be deleted.
184+
if tile.width() == TlogTile::FULL_WIDTH {
185+
prefixes.push(format!("{}.p/", tile.path()));
186+
if tile.level() == 0 {
187+
// For level-0 tree tiles, also delete the corresponding
188+
// data and (optional) aux files.
189+
prefixes.push(format!(
190+
"{}.p/",
191+
tile.with_data_path(self.config.data_path).path()
192+
));
193+
if let Some(aux_path) = self.config.aux_path {
194+
prefixes.push(format!("{}.p/", tile.with_data_path(aux_path).path()));
195+
}
196+
}
197+
}
198+
}
199+
for partial_tile in try_join_all(prefixes.iter().map(|prefix| self.list_prefix(prefix)))
200+
.await?
201+
.into_iter()
202+
.flatten()
203+
{
204+
batch.push(partial_tile);
205+
206+
if batch.len() == MAX_DELETE_BATCH {
207+
// Delete full batch.
208+
self.checked_add_subrequests(1)?;
209+
self.bucket.delete_multiple(mem::take(batch)).await?;
210+
211+
// Save progress.
212+
*self.cleaned_size.borrow_mut() = pending_cleaned_size;
213+
self.storage
214+
.put(CLEANED_SIZE_KEY, pending_cleaned_size)
215+
.await?;
216+
}
217+
}
218+
219+
Ok(())
220+
}
221+
222+
// List files with the specified prefix.
223+
async fn list_prefix(&self, prefix: &str) -> Result<Vec<String>, WorkerError> {
224+
self.checked_add_subrequests(1)?;
225+
Ok(self
226+
.bucket
227+
.list()
228+
.prefix(prefix)
229+
.execute()
230+
.await?
231+
.objects()
232+
.iter()
233+
.map(Object::key)
234+
.collect::<Vec<_>>())
235+
}
236+
237+
// Get the latest log size by retrieving the checkpoint from object storage.
238+
async fn current_size(&self) -> Result<u64, WorkerError> {
239+
self.checked_add_subrequests(1)?;
240+
let checkpoint_bytes = self
241+
.bucket
242+
.get(CHECKPOINT_KEY)
243+
.execute()
244+
.await?
245+
.ok_or("failed to retrieve checkpoint from object storage")?
246+
.body()
247+
.ok_or("missing object body")?
248+
.bytes()
249+
.await?;
250+
let checkpoint = tlog_tiles::open_checkpoint(
251+
self.config.origin.as_str(),
252+
&self.config.verifiers,
253+
now_millis(),
254+
&checkpoint_bytes,
255+
)
256+
.map_err(|e| e.to_string())?
257+
.0;
258+
259+
Ok(checkpoint.size())
260+
}
261+
262+
// Add to the subrequest count after checking that the new subrequests will not
263+
// put the worker over the limit.
264+
//
265+
// # Errors
266+
// Will return `CleanupError::Subrequests` if the additional subreqeusts would
267+
// cause the limit to be exceeded.
268+
fn checked_add_subrequests(&self, new: usize) -> Result<(), WorkerError> {
269+
if *self.subrequests.borrow() + new > SUBREQUEST_LIMIT {
270+
return Err("reached subrequest limit".into());
271+
}
272+
*self.subrequests.borrow_mut() += new;
273+
Ok(())
274+
}
275+
}

crates/generic_log_worker/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use base64::prelude::BASE64_STANDARD;
55
use base64::Engine;
66

77
pub mod batcher_do;
8+
pub mod cleaner_do;
89
pub mod log_ops;
910
mod metrics;
1011
pub mod sequencer_do;

crates/generic_log_worker/src/log_ops.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ use worker::Error as WorkerError;
5151
const DATA_TILE_LEVEL_KEY: u8 = u8::MAX;
5252
/// Same as above, anything above 63 is fine to use as the level key.
5353
const UNHASHED_TILE_LEVEL_KEY: u8 = u8::MAX - 1;
54+
/// Path used to store checkpoints, both in the object storage and lock backends.
5455
pub const CHECKPOINT_KEY: &str = "checkpoint";
56+
/// Path used to store staging bundles in the lock backend.
5557
const STAGING_KEY: &str = "staging";
5658

5759
// Limit on the number of entries per batch. Tune this parameter to avoid

0 commit comments

Comments
 (0)