Skip to content

Commit bb8d9a3

Browse files
committed
Add partial tile cleanup utility, fixes #1
The tlog-tiles and static-ct-api specs allow partial tiles to be deleted when the corresponding full tile is available. This helps to reduce R2 storage costs, but will incur extra cost for the R2 list and delete operations. - Add a `Cleaner` Durable Object, which iterates over a log and for each full tile available in the public bucket, lists and deletes any corresponding partial tiles. The cleaner tracks subrequests to prevent an alarm from exceeding the 1000 subrequests limit per invocation. - (bonus) Lays the groundwork for implementing a tlog-witness or tlog-mirror as a service that periodically updates based on a target log's latest checkpoint. Other changes: - Update to worker 0.6.1 for R2 `delete_multiple` support and `PartialEq` support for ObjectIds. - Move more Durable Object intialization logic from the MTC and CT applications to the `generic_log_worker` crate to deduplicate code. - Fix tree time metric to seconds.
1 parent 6239872 commit bb8d9a3

File tree

23 files changed

+617
-159
lines changed

23 files changed

+617
-159
lines changed

Cargo.lock

Lines changed: 6 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ thiserror = "2.0"
7171
tlog_tiles = { path = "crates/tlog_tiles", version = "0.2.0" }
7272
tokio = { version = "1", features = ["sync"] }
7373
url = "2.2"
74-
worker = "0.6.0"
74+
worker = "0.6.1"
7575
x509-cert = "0.2.5"
7676
x509-verify = { version = "0.4.4", features = [
7777
"md2",

crates/ct_worker/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ tlog_tiles.workspace = true
5757
worker.workspace = true
5858
x509-cert.workspace = true
5959
x509_util.workspace = true
60-
prometheus.workspace = true
6160
chrono.workspace = true
6261
base64ct.workspace = true
6362
csv.workspace = true

crates/ct_worker/config.schema.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@
113113
"type": "boolean",
114114
"default": true,
115115
"description": "Enables loading root store trusted roots from the CCADB list, in addition to any roots configured in `roots.<env>.pem`. If enabled, requires a KV namespace with the binding `ccadb_roots` to be configured in `wrangler.jsonc`, as well as a cron trigger so that the CCADB list auto-updates."
116+
},
117+
"clean_interval_secs": {
118+
"type": "integer",
119+
"minimum": 1,
120+
"default": 60,
121+
"description": "How long to wait in between runs of the partial tile cleaner. For static CT, the cleaner can clean 498 tiles (127,488 entries) per run before hitting the Workers limit of 1000 subrequests, so the default of once every 60 seconds should keep up with a log that grows at 2000 entries/second."
116122
}
117123
},
118124
"required": [

crates/ct_worker/config/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ pub struct LogParams {
4242
pub enable_dedup: bool,
4343
#[serde(default = "default_bool::<true>")]
4444
pub enable_ccadb_roots: bool,
45+
#[serde(default = "default_u64::<60>")]
46+
pub clean_interval_secs: u64,
4547
}
4648

4749
fn default_bool<const V: bool>() -> bool {

crates/ct_worker/src/batcher_do.rs

Lines changed: 13 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::CONFIG;
2-
use generic_log_worker::{get_durable_object_stub, load_cache_kv, BatcherConfig, GenericBatcher};
2+
use generic_log_worker::{get_durable_object_name, BatcherConfig, GenericBatcher, BATCHER_BINDING};
33
#[allow(clippy::wildcard_imports)]
44
use worker::*;
55

@@ -8,46 +8,24 @@ struct Batcher(GenericBatcher);
88

99
impl DurableObject for Batcher {
1010
fn new(state: State, env: Env) -> Self {
11-
// Find the Durable Object name by enumerating all possibilities.
12-
// TODO after update to worker > 0.6.0 use ObjectId::equals for comparison.
13-
let id = state.id().to_string();
14-
let namespace = env.durable_object("BATCHER").unwrap();
15-
let (name, params) = CONFIG
16-
.logs
17-
.iter()
18-
.find(|(name, params)| {
19-
for shard_id in 0..params.num_batchers {
20-
if id
21-
== namespace
22-
.id_from_name(&format!("{name}_{shard_id:x}"))
23-
.unwrap()
24-
.to_string()
25-
{
26-
return true;
27-
}
28-
}
29-
false
30-
})
31-
.expect("unable to find batcher name");
32-
let kv = if params.enable_dedup {
33-
Some(load_cache_kv(&env, name).unwrap())
34-
} else {
35-
None
36-
};
37-
let sequencer = get_durable_object_stub(
11+
let name = get_durable_object_name(
3812
&env,
39-
name,
40-
None,
41-
"SEQUENCER",
42-
params.location_hint.as_deref(),
43-
)
44-
.unwrap();
13+
&state,
14+
BATCHER_BINDING,
15+
&mut CONFIG
16+
.logs
17+
.iter()
18+
.map(|(name, params)| (name.as_str(), params.num_batchers)),
19+
);
20+
let params = &CONFIG.logs[name];
4521
let config = BatcherConfig {
4622
name: name.to_string(),
4723
max_batch_entries: params.max_batch_entries,
4824
batch_timeout_millis: params.batch_timeout_millis,
25+
enable_dedup: params.enable_dedup,
26+
location_hint: params.location_hint.clone(),
4927
};
50-
Batcher(GenericBatcher::new(config, kv, sequencer))
28+
Batcher(GenericBatcher::new(&env, config))
5129
}
5230

5331
async fn fetch(&self, req: Request) -> Result<Response> {

crates/ct_worker/src/cleaner_do.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use std::time::Duration;
2+
3+
use crate::{load_checkpoint_signers, load_origin, CONFIG};
4+
use generic_log_worker::{get_durable_object_name, CleanerConfig, GenericCleaner, CLEANER_BINDING};
5+
use signed_note::VerifierList;
6+
use static_ct_api::StaticCTPendingLogEntry;
7+
use tlog_tiles::PendingLogEntry;
8+
#[allow(clippy::wildcard_imports)]
9+
use worker::*;
10+
11+
#[durable_object(alarm)]
12+
struct Cleaner(GenericCleaner);
13+
14+
impl DurableObject for Cleaner {
15+
fn new(state: State, env: Env) -> Self {
16+
let name = get_durable_object_name(
17+
&env,
18+
&state,
19+
CLEANER_BINDING,
20+
&mut CONFIG.logs.keys().map(|name| (name.as_str(), 0)),
21+
);
22+
let params = &CONFIG.logs[name];
23+
24+
let config = CleanerConfig {
25+
name: name.to_string(),
26+
origin: load_origin(name),
27+
data_path: StaticCTPendingLogEntry::DATA_TILE_PATH,
28+
aux_path: StaticCTPendingLogEntry::AUX_TILE_PATH,
29+
verifiers: VerifierList::new(
30+
load_checkpoint_signers(&env, name)
31+
.iter()
32+
.map(|s| s.verifier())
33+
.collect(),
34+
),
35+
clean_interval: Duration::from_secs(params.clean_interval_secs),
36+
};
37+
38+
Cleaner(GenericCleaner::new(&state, &env, config))
39+
}
40+
41+
async fn fetch(&self, req: Request) -> Result<Response> {
42+
self.0.fetch(req).await
43+
}
44+
45+
async fn alarm(&self) -> Result<Response> {
46+
self.0.alarm().await
47+
}
48+
}

crates/ct_worker/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::ccadb_roots_cron::update_ccadb_roots;
1818

1919
mod batcher_do;
2020
mod ccadb_roots_cron;
21+
mod cleaner_do;
2122
mod frontend_worker;
2223
mod sequencer_do;
2324

crates/ct_worker/src/sequencer_do.rs

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
use std::time::Duration;
77

88
use crate::{load_checkpoint_signers, load_origin, CONFIG};
9-
use generic_log_worker::{load_public_bucket, GenericSequencer, SequencerConfig};
10-
use prometheus::Registry;
9+
use generic_log_worker::{
10+
get_durable_object_name, GenericSequencer, SequencerConfig, SEQUENCER_BINDING,
11+
};
1112
use static_ct_api::StaticCTLogEntry;
1213
#[allow(clippy::wildcard_imports)]
1314
use worker::*;
@@ -17,38 +18,27 @@ struct Sequencer(GenericSequencer<StaticCTLogEntry>);
1718

1819
impl DurableObject for Sequencer {
1920
fn new(state: State, env: Env) -> Self {
20-
// Find the Durable Object name by enumerating all possibilities.
21-
// TODO after update to worker > 0.6.0 use ObjectId::equals for comparison.
22-
let id = state.id().to_string();
23-
let namespace = env.durable_object("SEQUENCER").unwrap();
24-
let (name, params) = CONFIG
25-
.logs
26-
.iter()
27-
.find(|(name, _)| id == namespace.id_from_name(name).unwrap().to_string())
28-
.expect("unable to find sequencer name");
29-
30-
let origin = load_origin(name);
31-
let sequence_interval = Duration::from_millis(params.sequence_interval_millis);
32-
33-
// We don't use checkpoint extensions for CT
34-
let checkpoint_extension = Box::new(|_| vec![]);
35-
36-
let checkpoint_signers = load_checkpoint_signers(&env, name);
37-
let bucket = load_public_bucket(&env, name).unwrap();
38-
let registry = Registry::new();
21+
let name = get_durable_object_name(
22+
&env,
23+
&state,
24+
SEQUENCER_BINDING,
25+
&mut CONFIG.logs.keys().map(|name| (name.as_str(), 0)),
26+
);
27+
let params = &CONFIG.logs[name];
3928

4029
let config = SequencerConfig {
4130
name: name.to_string(),
42-
origin,
43-
checkpoint_signers,
44-
checkpoint_extension,
45-
sequence_interval,
31+
origin: load_origin(name),
32+
checkpoint_signers: load_checkpoint_signers(&env, name),
33+
checkpoint_extension: Box::new(|_| vec![]), // no checkpoint extensions for CT
34+
sequence_interval: Duration::from_millis(params.sequence_interval_millis),
4635
max_sequence_skips: params.max_sequence_skips,
4736
enable_dedup: params.enable_dedup,
4837
sequence_skip_threshold_millis: params.sequence_skip_threshold_millis,
38+
location_hint: params.location_hint.clone(),
4939
};
5040

51-
Sequencer(GenericSequencer::new(config, state, bucket, registry))
41+
Sequencer(GenericSequencer::new(state, env, config))
5242
}
5343

5444
async fn fetch(&self, req: Request) -> Result<Response> {

crates/ct_worker/wrangler.jsonc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@
5555
{
5656
"name": "BATCHER",
5757
"class_name": "Batcher"
58+
},
59+
{
60+
"name": "CLEANER",
61+
"class_name": "Cleaner"
5862
}
5963
]
6064
},
@@ -66,6 +70,12 @@
6670
"Sequencer",
6771
"Batcher"
6872
]
73+
},
74+
{
75+
"tag": "v2",
76+
"new_sqlite_classes": [
77+
"Cleaner"
78+
]
6979
}
7080
]
7181
},
@@ -112,6 +122,10 @@
112122
{
113123
"name": "BATCHER",
114124
"class_name": "Batcher"
125+
},
126+
{
127+
"name": "CLEANER",
128+
"class_name": "Cleaner"
115129
}
116130
]
117131
},
@@ -123,6 +137,12 @@
123137
"Sequencer",
124138
"Batcher"
125139
]
140+
},
141+
{
142+
"tag": "v2",
143+
"new_sqlite_classes": [
144+
"Cleaner"
145+
]
126146
}
127147
]
128148
}

0 commit comments

Comments
 (0)