Skip to content

Commit 56fdac7

Browse files
committed
Refactor pool logic
* Clean up AddLeafResult enum and add source() method to replace separate AddLeafResultSource enum * Move majority of logic for add_leaf_to_pool to mthods of PoolState. This will allow us to more easily change how the pooling works in a future PR (e.g., #33). * Remove pool_size parameter from configuration. It's never been useful to customize per log, so better to just make it a constant.
1 parent ca8f898 commit 56fdac7

File tree

4 files changed

+85
-76
lines changed

4 files changed

+85
-76
lines changed

crates/ct_worker/config.schema.json

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,6 @@
6464
"type": "string",
6565
"description": "Provide a hint to place the log in a specific geographic location. See https://developers.cloudflare.com/durable-objects/reference/data-location/ for supported locations. If unspecified, the Durable Object will be created in proximity to the first request."
6666
},
67-
"pool_size": {
68-
"type": "integer",
69-
"minimum": 1,
70-
"default": 4000,
71-
"description": "The maximum number of entries to sequence at a time. See lib.rs for more information on the default."
72-
},
7367
"sequence_interval": {
7468
"type": "integer",
7569
"minimum": 1,

crates/ct_worker/config/src/lib.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,10 @@ pub struct LogParams {
2727
pub submission_url: String,
2828
pub temporal_interval: TemporalInterval,
2929
pub location_hint: Option<String>,
30-
#[serde(default = "default_pool_size_seconds")]
31-
pub pool_size: usize,
3230
#[serde(default = "default_sequence_interval_seconds")]
3331
pub sequence_interval: u64,
3432
}
3533

36-
// Limit on the number of entries per batch. Tune this parameter to avoid running into various size limitations.
37-
// For instance, unexpectedly large leaves (e.g., with PQ signatures) could cause us to exceed the 128MB Workers memory limit. Storing 4000 10KB certificates is 40MB.
38-
fn default_pool_size_seconds() -> usize {
39-
4000
40-
}
41-
4234
fn default_sequence_interval_seconds() -> u64 {
4335
1
4436
}

crates/ct_worker/src/ctlog.rs

Lines changed: 83 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,19 @@ const DATA_TILE_KEY: u8 = u8::MAX;
4747
const CHECKPOINT_KEY: &str = "checkpoint";
4848
const STAGING_KEY: &str = "staging";
4949

50+
// Limit on the number of entries per batch. Tune this parameter to avoid
51+
// running into various size limitations. For instance, unexpectedly large
52+
// leaves (e.g., with PQ signatures) could cause us to exceed the 128MB Workers
53+
// memory limit. Storing 4000 10KB certificates is 40MB.
54+
const MAX_POOL_SIZE: usize = 4000;
55+
5056
/// Configuration for a CT log.
5157
#[derive(Clone)]
5258
pub(crate) struct LogConfig {
5359
pub(crate) name: String,
5460
pub(crate) origin: String,
5561
pub(crate) signing_key: EcdsaSigningKey,
5662
pub(crate) witness_key: Ed25519SigningKey,
57-
pub(crate) pool_size: usize,
5863
pub(crate) sequence_interval: Duration,
5964
}
6065

@@ -78,6 +83,46 @@ pub(crate) struct PoolState {
7883
in_sequencing_done: Option<Receiver<SequenceMetadata>>,
7984
}
8085

86+
impl PoolState {
87+
// Check if the key is already in the pool. If so, return the index of the
88+
// entry in the pool, and a Receiver from which to read the entry metadata
89+
// when it is sequenced.
90+
fn check(&self, key: &LookupKey) -> Option<AddLeafResult> {
91+
if let Some(index) = self.in_sequencing.get(key) {
92+
// Entry is being sequenced.
93+
Some(AddLeafResult::Pending {
94+
pool_index: *index,
95+
rx: self.in_sequencing_done.clone().unwrap(),
96+
source: PendingSource::InSequencing,
97+
})
98+
} else {
99+
self.current_pool
100+
.by_hash
101+
.get(key)
102+
.map(|index| AddLeafResult::Pending {
103+
pool_index: *index,
104+
rx: self.current_pool.done.subscribe(),
105+
source: PendingSource::Pool,
106+
})
107+
}
108+
}
109+
fn add(&mut self, key: LookupKey, leaf: &LogEntry) -> AddLeafResult {
110+
// This is a new entry. Add it to the pool if there's room.
111+
if self.current_pool.pending_leaves.len() >= MAX_POOL_SIZE {
112+
return AddLeafResult::RateLimited;
113+
}
114+
self.current_pool.pending_leaves.push(leaf.clone());
115+
let pool_index = (self.current_pool.pending_leaves.len() as u64) - 1;
116+
self.current_pool.by_hash.insert(key, pool_index);
117+
118+
AddLeafResult::Pending {
119+
pool_index,
120+
rx: self.current_pool.done.subscribe(),
121+
source: PendingSource::Sequencer,
122+
}
123+
}
124+
}
125+
81126
// State owned by the sequencing loop.
82127
#[derive(Debug)]
83128
pub(crate) struct SequenceState {
@@ -282,7 +327,11 @@ impl SequenceState {
282327
/// entry or a pending entry that must be resolved.
283328
pub(crate) enum AddLeafResult {
284329
Cached(SequenceMetadata),
285-
Pending((u64, Receiver<SequenceMetadata>)),
330+
Pending {
331+
pool_index: u64,
332+
rx: Receiver<SequenceMetadata>,
333+
source: PendingSource,
334+
},
286335
RateLimited,
287336
}
288337

@@ -292,7 +341,11 @@ impl AddLeafResult {
292341
pub(crate) async fn resolve(self) -> Option<SequenceMetadata> {
293342
match self {
294343
AddLeafResult::Cached(entry) => Some(entry),
295-
AddLeafResult::Pending((pool_index, mut rx)) => {
344+
AddLeafResult::Pending {
345+
pool_index,
346+
mut rx,
347+
source: _,
348+
} => {
296349
// Wait until sequencing completes for this entry's pool.
297350
if rx.changed().await.is_ok() {
298351
let (first_index, timestamp) = *rx.borrow();
@@ -305,70 +358,50 @@ impl AddLeafResult {
305358
AddLeafResult::RateLimited => None,
306359
}
307360
}
308-
}
309-
pub(crate) enum AddLeafResultSource {
310-
InSequencing,
311-
Pool,
312-
Cache,
313-
Sequencer,
314-
RateLimit,
315-
}
316361

317-
impl std::fmt::Display for AddLeafResultSource {
318-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
362+
pub(crate) fn source(&self) -> &'static str {
319363
match self {
320-
AddLeafResultSource::InSequencing => write!(f, "sequencing"),
321-
AddLeafResultSource::Pool => write!(f, "pool"),
322-
AddLeafResultSource::Cache => write!(f, "cache"),
323-
AddLeafResultSource::Sequencer => write!(f, "sequencer"),
324-
AddLeafResultSource::RateLimit => write!(f, "ratelimit"),
364+
AddLeafResult::Cached(_) => "cache",
365+
AddLeafResult::RateLimited => "ratelimit",
366+
AddLeafResult::Pending {
367+
pool_index: _,
368+
rx: _,
369+
source,
370+
} => match source {
371+
PendingSource::InSequencing => "sequencing",
372+
PendingSource::Pool => "pool",
373+
PendingSource::Sequencer => "sequencer",
374+
},
325375
}
326376
}
327377
}
378+
pub(crate) enum PendingSource {
379+
InSequencing,
380+
Pool,
381+
Sequencer,
382+
}
328383

329384
/// Add a leaf (a certificate or pre-certificate) to the pool of pending entries.
330385
///
331-
/// If the entry is has already been sequenced and is in the cache, return immediately
386+
/// If the entry has already been sequenced and is in the cache, return immediately
332387
/// with a [`AddLeafResult::Cached`]. If the pool is full, return
333388
/// [`AddLeafResult::RateLimited`]. Otherwise, return a [`AddLeafResult::Pending`] which
334389
/// can be resolved once the entry has been sequenced.
335390
pub(crate) fn add_leaf_to_pool(
336391
state: &mut PoolState,
337-
pool_size: usize,
338392
cache: &impl CacheRead,
339393
leaf: &LogEntry,
340-
) -> (AddLeafResult, AddLeafResultSource) {
394+
) -> AddLeafResult {
341395
let hash = leaf.lookup_key();
342-
let pool_index: u64;
343-
let rx: Receiver<SequenceMetadata>;
344-
let source: AddLeafResultSource;
345-
346-
if let Some(index) = state.in_sequencing.get(&hash) {
347-
// Entry is being sequenced.
348-
pool_index = *index;
349-
rx = state.in_sequencing_done.clone().unwrap();
350-
source = AddLeafResultSource::InSequencing;
351-
} else if let Some(index) = state.current_pool.by_hash.get(&hash) {
352-
// Entry is already pending.
353-
pool_index = *index;
354-
rx = state.current_pool.done.subscribe();
355-
source = AddLeafResultSource::Pool;
396+
397+
if let Some(result) = state.check(&hash) {
398+
result
356399
} else if let Some(v) = cache.get_entry(&hash) {
357400
// Entry is cached.
358-
return (AddLeafResult::Cached(v), AddLeafResultSource::Cache);
401+
AddLeafResult::Cached(v)
359402
} else {
360-
// This is a new entry. Add it to the pool.
361-
if pool_size > 0 && state.current_pool.pending_leaves.len() >= pool_size {
362-
return (AddLeafResult::RateLimited, AddLeafResultSource::RateLimit);
363-
}
364-
state.current_pool.pending_leaves.push(leaf.clone());
365-
pool_index = (state.current_pool.pending_leaves.len() as u64) - 1;
366-
state.current_pool.by_hash.insert(hash, pool_index);
367-
rx = state.current_pool.done.subscribe();
368-
source = AddLeafResultSource::Sequencer;
369-
};
370-
371-
(AddLeafResult::Pending((pool_index, rx)), source)
403+
state.add(hash, leaf)
404+
}
372405
}
373406

374407
/// Uploads any newly-observed issuers to the object backend, returning the paths of those uploaded.
@@ -1093,7 +1126,7 @@ mod tests {
10931126
certificate,
10941127
..Default::default()
10951128
};
1096-
add_leaf_to_pool(&mut log.pool_state, log.config.pool_size, &log.cache, &leaf);
1129+
add_leaf_to_pool(&mut log.pool_state, &log.cache, &leaf);
10971130
}
10981131
log.sequence().unwrap();
10991132
}
@@ -1811,7 +1844,6 @@ mod tests {
18111844
origin: "example.com/TestLog".to_string(),
18121845
witness_key: Ed25519SigningKey::generate(&mut OsRng),
18131846
signing_key: EcdsaSigningKey::random(&mut OsRng),
1814-
pool_size: 0,
18151847
sequence_interval: Duration::from_secs(1),
18161848
};
18171849
let pool_state = PoolState::default();
@@ -1892,13 +1924,7 @@ mod tests {
18921924

18931925
block_on(upload_issuers(&self.object, issuers, &self.config.name)).unwrap();
18941926

1895-
add_leaf_to_pool(
1896-
&mut self.pool_state,
1897-
self.config.pool_size,
1898-
&self.cache,
1899-
&leaf,
1900-
)
1901-
.0
1927+
add_leaf_to_pool(&mut self.pool_state, &self.cache, &leaf)
19021928
}
19031929

19041930
fn check(&self, size: u64) -> u64 {

crates/ct_worker/src/sequencer_do.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ impl Sequencer {
161161
origin: origin.to_string(),
162162
signing_key,
163163
witness_key,
164-
pool_size: params.pool_size,
165164
sequence_interval,
166165
});
167166
self.public_bucket = Some(ObjectBucket {
@@ -223,7 +222,6 @@ impl Sequencer {
223222
// are omitted.
224223
async fn add_batch(&mut self, pending_entries: &[LogEntry]) -> Result<Response> {
225224
// Safe to unwrap config here as the log must be initialized.
226-
let config = self.config.as_ref().unwrap();
227225
let mut futures = Vec::with_capacity(pending_entries.len());
228226
for pending_entry in pending_entries {
229227
let typ = if pending_entry.is_precert {
@@ -232,16 +230,15 @@ impl Sequencer {
232230
"add-chain"
233231
};
234232

235-
let (add_leaf_result, source) = ctlog::add_leaf_to_pool(
233+
let add_leaf_result = ctlog::add_leaf_to_pool(
236234
&mut self.pool_state,
237-
config.pool_size,
238235
self.cache.as_ref().unwrap(),
239236
pending_entry,
240237
);
241238

242239
self.metrics
243240
.entry_count
244-
.with_label_values(&[typ, &source.to_string()])
241+
.with_label_values(&[typ, add_leaf_result.source()])
245242
.inc();
246243

247244
futures.push(add_leaf_result.resolve());

0 commit comments

Comments
 (0)