Skip to content

Commit c24e2bf

Browse files
committed
WIP add partial tile cleanup utility
The tlog-tiles and static-ct-api specs allow partial tiles to be deleted when the corresponding full tile is available. This helps to reduce R2 storage costs, but will incur extra cost for the R2 list and delete operations. There are some limitations with the current cron job approach: * Workers have a 1000 subrequest limit, so log cleanup needs to be broken up into many invocations, saving state in between. * Cron jobs can only be triggered once a minute, so cleaning up a log could be slow. * A single cron job is responsible for cleaning up all logs. * The workers-rs Rust bindings currently don't support deleting multiple keys at once from a bucket, so we could quickly hit subrequest limits. TODO Switch to a Durable Object to manage log cleanup. This has some nice benefits: * Saving/loading state is easy with DO storage * DO alarms can be scheduled for immediate execution, so we aren't wasting time in between invocations. * We can have one cleanup DO per log for better parallelism. * We can easily make this generic the same was as for the other DOs. * (bonus) Lays the groundwork for implementing a tlog-witness or tlog-mirror as a service that periodically updates based on a target log's latest checkpoint.
1 parent 73b7438 commit c24e2bf

File tree

7 files changed

+278
-32
lines changed

7 files changed

+278
-32
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/ct_worker/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ serde_with.workspace = true
5353
sha2.workspace = true
5454
static_ct_api.workspace = true
5555
signed_note.workspace = true
56+
thiserror.workspace = true
5657
tlog_tiles.workspace = true
5758
worker.workspace = true
5859
x509-verify.workspace = true

crates/ct_worker/src/lib.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,19 @@
66
use config::AppConfig;
77
use ed25519_dalek::SigningKey as Ed25519SigningKey;
88
use p256::{ecdsa::SigningKey as EcdsaSigningKey, pkcs8::DecodePrivateKey};
9+
use signed_note::KeyName;
10+
use static_ct_api::StaticCTCheckpointSigner;
911
use std::collections::HashMap;
1012
use std::sync::{LazyLock, OnceLock};
11-
use tlog_tiles::{LookupKey, SequenceMetadata};
13+
use tlog_tiles::{CheckpointSigner, Ed25519CheckpointSigner, LookupKey, SequenceMetadata};
1214
#[allow(clippy::wildcard_imports)]
1315
use worker::*;
1416
use x509_util::CertPool;
1517
use x509_verify::x509_cert::Certificate;
1618

1719
mod batcher_do;
1820
mod frontend_worker;
21+
mod partial_cleanup_cron;
1922
mod sequencer_do;
2023

2124
// Application configuration.
@@ -72,3 +75,31 @@ pub(crate) fn load_witness_key(env: &Env, name: &str) -> Result<&'static Ed25519
7275
Ok(once.get_or_init(|| key))
7376
}
7477
}
78+
79+
pub(crate) fn load_checkpoint_signers(env: &Env, name: &str) -> Vec<Box<dyn CheckpointSigner>> {
80+
let origin = load_origin(name);
81+
let signing_key = load_signing_key(env, name).unwrap().clone();
82+
let witness_key = load_witness_key(env, name).unwrap().clone();
83+
84+
// Make the checkpoint signers from the secret keys and put them in a vec
85+
let signer = StaticCTCheckpointSigner::new(origin.clone(), signing_key)
86+
.map_err(|e| format!("could not create static-ct checkpoint signer: {e}"))
87+
.unwrap();
88+
let witness = Ed25519CheckpointSigner::new(origin, witness_key)
89+
.map_err(|e| format!("could not create ed25519 checkpoint signer: {e}"))
90+
.unwrap();
91+
92+
vec![Box::new(signer), Box::new(witness)]
93+
}
94+
95+
pub(crate) fn load_origin(name: &str) -> KeyName {
96+
KeyName::new(
97+
CONFIG.logs[name]
98+
.submission_url
99+
.trim_start_matches("http://")
100+
.trim_start_matches("https://")
101+
.trim_end_matches('/')
102+
.to_string(),
103+
)
104+
.expect("invalid origin name")
105+
}
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
use futures_util::future::join_all;
2+
use generic_log_worker::{load_public_bucket, log_ops::CHECKPOINT_KEY, util::now_millis};
3+
use signed_note::{KeyName, NoteVerifier, VerifierList};
4+
use static_ct_api::StaticCTPendingLogEntry;
5+
use tlog_tiles::{PendingLogEntry, TlogTile};
6+
use worker::{event, Bucket, Env, ScheduleContext, ScheduledEvent};
7+
8+
use crate::{load_checkpoint_signers, load_origin, CONFIG};
9+
10+
// Workers are limited to 1000 subrequests per invocation (including R2 operations).
11+
// For each log, we'll need to perform the following subrequests:
12+
// - Get old and new log sizes (2 ops)
13+
// - List partials for full tree, data, and (optional) aux tiles (2-3 ops per 256 entries, plus logarithmic level-1+ tree tiles)
14+
// - Delete partials for full tree, data, and (optional) aux tiles (0-3 ops per 256 entries, after <https://github.com/cloudflare/workers-rs/issues/780>)
15+
// - Save new tree size (1 op)
16+
// We track subrequest to avoid going over the limit, but can still limit the range of entries.
17+
const SUBREQUEST_LIMIT: usize = 1000;
18+
const STEP: u64 = TlogTile::FULL_WIDTH as u64;
19+
const CLEANED_SIZE_KEY: &str = "_cleanup_cron_progress";
20+
21+
#[derive(thiserror::Error, Debug)]
22+
enum CleanupError {
23+
#[error(transparent)]
24+
Worker(#[from] worker::Error),
25+
#[error("subrequest limit")]
26+
Subrequests,
27+
}
28+
29+
/// Partial tile cleanup cron job periodically does the following:
30+
///
31+
/// for each configured log:
32+
/// 1. set new_size to the current (verified) checkpoint size
33+
/// 2. set old_size to the checkpoint size when the cron job previously successfully ran
34+
/// 3. get the list of tiles created between old_size and new_size (via `TlogTile::new_tiles(old_size, new_size)`)
35+
/// 4. for each full tile:
36+
/// a. list the corresponding partial tiles (matching the prefix "<full tile key>.p/")
37+
/// b. delete the partial tiles
38+
#[event(scheduled)]
39+
pub async fn scheduled(_event: ScheduledEvent, env: Env, _ctx: ScheduleContext) {
40+
let mut subrequests = 0;
41+
for name in CONFIG.logs.keys() {
42+
if checked_add_subrequests(&mut subrequests, 3).is_err() {
43+
// We need three subrequests to check and set the log size. If we've
44+
// already reached the subrequest limit, stop now.
45+
return;
46+
}
47+
48+
let origin = &load_origin(name);
49+
let verifiers = &VerifierList::new(
50+
load_checkpoint_signers(&env, name)
51+
.iter()
52+
.map(|s| s.verifier())
53+
.collect::<Vec<Box<dyn NoteVerifier>>>(),
54+
);
55+
let bucket = &load_public_bucket(&env, name).unwrap();
56+
let current_log_size = current_log_size(origin, verifiers, bucket).await.unwrap();
57+
let old_cleaned_size = cleaned_size(bucket).await.unwrap();
58+
log::debug!("cleaning {name}: {old_cleaned_size} to {current_log_size}");
59+
match clean_log(old_cleaned_size, current_log_size, bucket, &mut subrequests).await {
60+
Ok(cleaned_size) => {
61+
// Save progress on cleaning the log.
62+
if cleaned_size > old_cleaned_size {
63+
let _ = set_cleaned_size(cleaned_size, bucket)
64+
.await
65+
.inspect_err(|e| log::warn!("failed to update cleaned size: {name}: {e}"));
66+
}
67+
}
68+
Err(e) => log::warn!("failed to clean log: {name}: {e}"),
69+
}
70+
}
71+
}
72+
73+
// Clean up partial tiles from a log, stopping either when the current log size
74+
// is reached or the subrequest limit is reached. Returns the size of the tree
75+
// that has been cleaned so partial progress can be saved.
76+
async fn clean_log(
77+
old_size: u64,
78+
new_size: u64,
79+
bucket: &Bucket,
80+
subrequests: &mut usize,
81+
) -> Result<u64, CleanupError> {
82+
let mut cleaned_size = old_size;
83+
loop {
84+
if cleaned_size + STEP > new_size {
85+
// We've already cleaned the last full tile, so nothing else to do.
86+
break;
87+
}
88+
match clean_log_range(cleaned_size, cleaned_size + STEP, subrequests, bucket).await {
89+
Ok(()) => cleaned_size += STEP,
90+
Err(e) => {
91+
return match e {
92+
CleanupError::Subrequests => Ok(cleaned_size),
93+
CleanupError::Worker(_) => Err(e),
94+
}
95+
}
96+
}
97+
}
98+
Ok(cleaned_size)
99+
}
100+
101+
// Attempt to clean up all partial tiles within the specified range. Any failure
102+
// will require the full range to be retried later.
103+
//
104+
// # Errors
105+
// Will return `CleanupError::Subrequests` if the operation cannot be completed
106+
// because it would run into subrequest limits, and will return a
107+
// `CleanupError::Worker` if any other error occurs.
108+
async fn clean_log_range(
109+
start_size: u64,
110+
end_size: u64,
111+
subrequests: &mut usize,
112+
bucket: &Bucket,
113+
) -> Result<(), CleanupError> {
114+
// Get tree tiles between the start and end sizes.
115+
for tile in TlogTile::new_tiles(start_size, end_size) {
116+
// Full tiles only. If the full tile exists, the corresponding partial tiles can be deleted.
117+
if tile.width() == 1 << tile.height() {
118+
if tile.level() == 0 {
119+
// for level-0 tree tiles, delete the corresponding data and (optional) aux files.
120+
delete_dir(
121+
&format!(
122+
"{}.p/",
123+
tile.with_data_path(StaticCTPendingLogEntry::DATA_TILE_PATH)
124+
.path()
125+
),
126+
bucket,
127+
subrequests,
128+
)
129+
.await?;
130+
if let Some(aux_path) = StaticCTPendingLogEntry::AUX_TILE_PATH {
131+
delete_dir(
132+
&format!("{}.p/", tile.with_data_path(aux_path).path()),
133+
bucket,
134+
subrequests,
135+
)
136+
.await?;
137+
}
138+
}
139+
delete_dir(&format!("{}.p/", tile.path()), bucket, subrequests).await?;
140+
}
141+
}
142+
Ok(())
143+
}
144+
145+
// Delete all files in the specified directory.
146+
//
147+
// # Errors
148+
// Will return `CleanupError::Subrequests` and abort early if the subrequest
149+
// limit is reached before successfully deleting the directory, and will return
150+
// a `CleanupError::Worker` if any other error occurs.
151+
async fn delete_dir(
152+
prefix: &str,
153+
bucket: &Bucket,
154+
subrequests: &mut usize,
155+
) -> Result<(), CleanupError> {
156+
log::debug!("deleting {prefix}");
157+
checked_add_subrequests(subrequests, 1)?;
158+
let objects = bucket.list().prefix(prefix).execute().await?;
159+
// TODO add binding to delete multiple keys from R2 bucket. Otherwise, we'll
160+
// quickly hit workers subrequest limits.
161+
// Tracking issue: <https://github.com/cloudflare/workers-rs/issues/780>
162+
checked_add_subrequests(subrequests, objects.objects().len())?;
163+
let futures = objects
164+
.objects()
165+
.iter()
166+
.map(|obj| bucket.delete(obj.key()))
167+
.collect::<Vec<_>>();
168+
join_all(futures)
169+
.await
170+
.into_iter()
171+
.collect::<Result<Vec<_>, worker::Error>>()?;
172+
Ok(())
173+
}
174+
175+
async fn cleaned_size(bucket: &Bucket) -> Result<u64, worker::Error> {
176+
Ok(match bucket.get(CLEANED_SIZE_KEY).execute().await? {
177+
Some(obj) => u64::from_be_bytes(
178+
obj.body()
179+
.ok_or("missing object body")?
180+
.bytes()
181+
.await?
182+
.try_into()
183+
.map_err(|_| "failed to read u64")?,
184+
),
185+
None => 0,
186+
})
187+
}
188+
189+
async fn set_cleaned_size(size: u64, bucket: &Bucket) -> Result<(), worker::Error> {
190+
bucket
191+
.put(CLEANED_SIZE_KEY, size.to_be_bytes().to_vec())
192+
.execute()
193+
.await
194+
.map(|_| ())
195+
}
196+
197+
async fn current_log_size(
198+
origin: &KeyName,
199+
verifiers: &VerifierList,
200+
bucket: &Bucket,
201+
) -> Result<u64, worker::Error> {
202+
let checkpoint_bytes = bucket
203+
.get(CHECKPOINT_KEY)
204+
.execute()
205+
.await?
206+
.ok_or("failed to retrieve checkpoint from object storage")?
207+
.body()
208+
.ok_or("missing object body")?
209+
.bytes()
210+
.await?;
211+
let checkpoint =
212+
tlog_tiles::open_checkpoint(origin.as_str(), verifiers, now_millis(), &checkpoint_bytes)
213+
.map_err(|e| e.to_string())?
214+
.0;
215+
216+
Ok(checkpoint.size())
217+
}
218+
219+
// Add to the subrequest count after checking that the new subrequests will not
220+
// put the worker over the limit.
221+
//
222+
// # Errors
223+
// Will return `CleanupError::Subrequests` if the additional subreqeusts would
224+
// cause the limit to be exceeded.
225+
fn checked_add_subrequests(subrequests: &mut usize, new: usize) -> Result<(), CleanupError> {
226+
if *subrequests + new > SUBREQUEST_LIMIT {
227+
return Err(CleanupError::Subrequests);
228+
}
229+
*subrequests += new;
230+
Ok(())
231+
}

crates/ct_worker/src/sequencer_do.rs

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,10 @@
55
66
use std::time::Duration;
77

8-
use crate::{load_signing_key, load_witness_key, CONFIG};
8+
use crate::{load_checkpoint_signers, load_origin, CONFIG};
99
use generic_log_worker::{load_public_bucket, GenericSequencer, SequencerConfig};
1010
use prometheus::Registry;
11-
use signed_note::KeyName;
12-
use static_ct_api::{StaticCTCheckpointSigner, StaticCTLogEntry};
13-
use tlog_tiles::{CheckpointSigner, Ed25519CheckpointSigner};
11+
use static_ct_api::StaticCTLogEntry;
1412
#[allow(clippy::wildcard_imports)]
1513
use worker::*;
1614

@@ -30,36 +28,13 @@ impl DurableObject for Sequencer {
3028
.find(|(name, _)| id == namespace.id_from_name(name).unwrap().to_string())
3129
.expect("unable to find sequencer name");
3230

33-
// https://github.com/C2SP/C2SP/blob/main/static-ct-api.md#checkpoints
34-
// The origin line MUST be the submission prefix of the log as a schema-less URL with no trailing slashes.
35-
let origin = KeyName::new(
36-
params
37-
.submission_url
38-
.trim_start_matches("http://")
39-
.trim_start_matches("https://")
40-
.trim_end_matches('/')
41-
.to_string(),
42-
)
43-
.expect("invalid origin name");
31+
let origin = load_origin(name);
4432
let sequence_interval = Duration::from_millis(params.sequence_interval_millis);
4533

4634
// We don't use checkpoint extensions for CT
4735
let checkpoint_extension = Box::new(|_| vec![]);
4836

49-
let checkpoint_signers: Vec<Box<dyn CheckpointSigner>> = {
50-
let signing_key = load_signing_key(&env, name).unwrap().clone();
51-
let witness_key = load_witness_key(&env, name).unwrap().clone();
52-
53-
// Make the checkpoint signers from the secret keys and put them in a vec
54-
let signer = StaticCTCheckpointSigner::new(origin.clone(), signing_key)
55-
.map_err(|e| format!("could not create static-ct checkpoint signer: {e}"))
56-
.unwrap();
57-
let witness = Ed25519CheckpointSigner::new(origin.clone(), witness_key)
58-
.map_err(|e| format!("could not create ed25519 checkpoint signer: {e}"))
59-
.unwrap();
60-
61-
vec![Box::new(signer), Box::new(witness)]
62-
};
37+
let checkpoint_signers = load_checkpoint_signers(&env, name);
6338
let bucket = load_public_bucket(&env, name).unwrap();
6439
let registry = Registry::new();
6540

crates/ct_worker/wrangler.jsonc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,12 @@
5555
"Batcher"
5656
]
5757
}
58-
]
58+
],
59+
"triggers": {
60+
"crons": [
61+
"* * * * *"
62+
]
63+
}
5964
},
6065
"cftest": {
6166
"build": {

crates/generic_log_worker/src/log_ops.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ use worker::Error as WorkerError;
5050
const DATA_TILE_LEVEL_KEY: u8 = u8::MAX;
5151
/// Same as above, anything above 63 is fine to use as the level key.
5252
const UNHASHED_TILE_LEVEL_KEY: u8 = u8::MAX - 1;
53-
const CHECKPOINT_KEY: &str = "checkpoint";
53+
/// Path used to store checkpoints, both in the object storage and lock backends.
54+
pub const CHECKPOINT_KEY: &str = "checkpoint";
55+
/// Path used to store staging bundles in the lock backend.
5456
const STAGING_KEY: &str = "staging";
5557

5658
// Limit on the number of entries per batch. Tune this parameter to avoid

0 commit comments

Comments
 (0)