Skip to content

Commit 8ff3f8e

Browse files
committed
TODO split into two commits
- Add PendingLogEntry struct to avoid overloading LogEntry. PendingLogEntry contains all of the fields of LogEntry except for the leaf index and timestamp. - Instead of using a Sender per pool of entries, use one Sender per pending entry. This will allow us to split up the set of pending entries more easily, for example if we want to prefer creating full tiles instead of partial tiles (e.g., pick entries so that the log size is a multiple of 256, the full tile width).
1 parent 7fce31e commit 8ff3f8e

File tree

5 files changed

+146
-137
lines changed

5 files changed

+146
-137
lines changed

crates/ct_worker/src/batcher_do.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
use crate::{get_stub, load_cache_kv, LookupKey, QueryParams, SequenceMetadata};
1010
use base64::prelude::*;
1111
use futures_util::future::{join_all, select, Either};
12-
use static_ct_api::LogEntry;
12+
use static_ct_api::PendingLogEntry;
1313
use std::{
1414
collections::{HashMap, HashSet},
1515
time::Duration,
@@ -37,7 +37,7 @@ struct Batcher {
3737

3838
// A batch of entries to be submitted to the Sequencer together.
3939
struct Batch {
40-
pending_leaves: Vec<LogEntry>,
40+
pending_leaves: Vec<PendingLogEntry>,
4141
by_hash: HashSet<LookupKey>,
4242
done: Sender<HashMap<LookupKey, SequenceMetadata>>,
4343
}
@@ -68,7 +68,7 @@ impl DurableObject for Batcher {
6868
match req.path().as_str() {
6969
"/add_leaf" => {
7070
let name = &req.query::<QueryParams>()?.name;
71-
let entry: LogEntry = req.json().await?;
71+
let entry: PendingLogEntry = req.json().await?;
7272
let key = entry.lookup_key();
7373

7474
if self.in_flight >= MAX_IN_FLIGHT {

crates/ct_worker/src/ctlog.rs

Lines changed: 70 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use log::{debug, error, info, trace, warn};
3030
use p256::ecdsa::SigningKey as EcdsaSigningKey;
3131
use serde::{Deserialize, Serialize};
3232
use sha2::{Digest, Sha256};
33-
use static_ct_api::{LogEntry, TileIterator, TreeWithTimestamp};
33+
use static_ct_api::{LogEntry, PendingLogEntry, TileIterator, TreeWithTimestamp};
3434
use std::collections::HashMap;
3535
use std::time::Duration;
3636
use std::{
@@ -39,7 +39,7 @@ use std::{
3939
};
4040
use thiserror::Error;
4141
use tlog_tiles::{Hash, HashReader, PathElem, Tile, TlogError, TlogTile, HASH_SIZE};
42-
use tokio::sync::watch::{Receiver, Sender};
42+
use tokio::sync::watch::{channel, Receiver, Sender};
4343

4444
/// The maximum tile level is 63 (<c2sp.org/static-ct-api>), so safe to use [`u8::MAX`] as
4545
/// the special level for data tiles. The Go implementation uses -1.
@@ -75,65 +75,60 @@ pub(crate) struct LogConfig {
7575
/// <https://blog.cloudflare.com/durable-objects-easy-fast-correct-choose-three/#background-durable-objects-are-single-threaded>
7676
#[derive(Default, Debug)]
7777
pub(crate) struct PoolState {
78-
// Entries that are ready to be sequenced.
79-
pending_entries: Vec<LogEntry>,
78+
// Entries that are ready to be sequenced, along with the Sender that will
79+
// be updated when the entry is sequenced.
80+
pending_entries: Vec<PendingLogEntry>,
8081

81-
// Deduplication cache for entries currently pending sequencing.
82-
pending: HashMap<LookupKey, (u64, Receiver<SequenceMetadata>)>,
82+
// Callbacks for the pending entries.
83+
pending_senders: Vec<Sender<SequenceMetadata>>,
8384

84-
// Channel that will be updated when the current pool of pending entries is
85-
// sequenced.
86-
pending_done: Sender<SequenceMetadata>,
85+
// Deduplication cache for entries currently pending sequencing.
86+
pending: HashMap<LookupKey, Receiver<SequenceMetadata>>,
8787

8888
// Deduplication cache for entries currently being sequenced.
89-
in_sequencing: HashMap<LookupKey, (u64, Receiver<SequenceMetadata>)>,
89+
in_sequencing: HashMap<LookupKey, Receiver<SequenceMetadata>>,
9090
}
9191

9292
impl PoolState {
9393
// Check if the key is already in the pool. If so, return the index of the
9494
// entry in the pool, and a Receiver from which to read the entry metadata
9595
// when it is sequenced.
9696
fn check(&self, key: &LookupKey) -> Option<AddLeafResult> {
97-
if let Some((index, rx)) = self.in_sequencing.get(key) {
97+
if let Some(rx) = self.in_sequencing.get(key) {
9898
// Entry is being sequenced.
9999
Some(AddLeafResult::Pending {
100-
pool_index: *index,
101100
rx: rx.clone(),
102101
source: PendingSource::InSequencing,
103102
})
104103
} else {
105-
self.pending
106-
.get(key)
107-
.map(|(index, rx)| AddLeafResult::Pending {
108-
pool_index: *index,
109-
rx: rx.clone(),
110-
source: PendingSource::Pool,
111-
})
104+
self.pending.get(key).map(|rx| AddLeafResult::Pending {
105+
rx: rx.clone(),
106+
source: PendingSource::Pool,
107+
})
112108
}
113109
}
114110
// Add a new entry to the pool.
115-
fn add(&mut self, key: LookupKey, leaf: &LogEntry) -> AddLeafResult {
111+
fn add(&mut self, key: LookupKey, entry: &PendingLogEntry) -> AddLeafResult {
116112
if self.pending_entries.len() >= MAX_POOL_SIZE {
117113
return AddLeafResult::RateLimited;
118114
}
119-
self.pending_entries.push(leaf.clone());
120-
let pool_index = (self.pending_entries.len() as u64) - 1;
121-
let rx = self.pending_done.subscribe();
122-
self.pending.insert(key, (pool_index, rx.clone()));
115+
let (tx, rx) = channel((0, 0));
116+
self.pending_entries.push(entry.clone());
117+
self.pending_senders.push(tx);
118+
self.pending.insert(key, rx.clone());
123119

124120
AddLeafResult::Pending {
125-
pool_index,
126121
rx,
127122
source: PendingSource::Sequencer,
128123
}
129124
}
130125
// Take the entries from the pool that are ready to be sequenced and the
131126
// corresponding Senders to update when the entries have been sequenced.
132-
fn take(&mut self) -> (Vec<LogEntry>, Sender<SequenceMetadata>) {
127+
fn take(&mut self) -> (Vec<PendingLogEntry>, Vec<Sender<SequenceMetadata>>) {
133128
self.in_sequencing = std::mem::take(&mut self.pending);
134129
(
135130
std::mem::take(&mut self.pending_entries),
136-
std::mem::take(&mut self.pending_done),
131+
std::mem::take(&mut self.pending_senders),
137132
)
138133
}
139134
// Reset the map of in-sequencing entries. This should be called after
@@ -348,7 +343,6 @@ impl SequenceState {
348343
pub(crate) enum AddLeafResult {
349344
Cached(SequenceMetadata),
350345
Pending {
351-
pool_index: u64,
352346
rx: Receiver<SequenceMetadata>,
353347
source: PendingSource,
354348
},
@@ -361,15 +355,10 @@ impl AddLeafResult {
361355
pub(crate) async fn resolve(self) -> Option<SequenceMetadata> {
362356
match self {
363357
AddLeafResult::Cached(entry) => Some(entry),
364-
AddLeafResult::Pending {
365-
pool_index,
366-
mut rx,
367-
source: _,
368-
} => {
358+
AddLeafResult::Pending { mut rx, source: _ } => {
369359
// Wait until sequencing completes for this entry's pool.
370360
if rx.changed().await.is_ok() {
371-
let (first_index, timestamp) = *rx.borrow();
372-
Some((first_index + pool_index, timestamp))
361+
Some(*rx.borrow())
373362
} else {
374363
warn!("sender dropped");
375364
None
@@ -383,11 +372,7 @@ impl AddLeafResult {
383372
match self {
384373
AddLeafResult::Cached(_) => "cache",
385374
AddLeafResult::RateLimited => "ratelimit",
386-
AddLeafResult::Pending {
387-
pool_index: _,
388-
rx: _,
389-
source,
390-
} => match source {
375+
AddLeafResult::Pending { rx: _, source } => match source {
391376
PendingSource::InSequencing => "sequencing",
392377
PendingSource::Pool => "pool",
393378
PendingSource::Sequencer => "sequencer",
@@ -410,9 +395,9 @@ pub(crate) enum PendingSource {
410395
pub(crate) fn add_leaf_to_pool(
411396
state: &mut PoolState,
412397
cache: &impl CacheRead,
413-
leaf: &LogEntry,
398+
entry: &PendingLogEntry,
414399
) -> AddLeafResult {
415-
let hash = leaf.lookup_key();
400+
let hash = entry.lookup_key();
416401

417402
if let Some(result) = state.check(&hash) {
418403
// Entry is already pending or being sequenced.
@@ -422,7 +407,7 @@ pub(crate) fn add_leaf_to_pool(
422407
AddLeafResult::Cached(v)
423408
} else {
424409
// This is a new entry. Add it to the pool.
425-
state.add(hash, leaf)
410+
state.add(hash, entry)
426411
}
427412
}
428413

@@ -477,7 +462,7 @@ pub(crate) async fn sequence(
477462
cache: &mut impl CacheWrite,
478463
metrics: &Metrics,
479464
) -> Result<(), anyhow::Error> {
480-
let (pending_entries, pending_done) = pool_state.take();
465+
let (pending_entries, callbacks) = pool_state.take();
481466

482467
metrics
483468
.seq_pool_size
@@ -490,7 +475,7 @@ pub(crate) async fn sequence(
490475
lock,
491476
cache,
492477
pending_entries,
493-
pending_done,
478+
callbacks,
494479
metrics,
495480
)
496481
.await
@@ -543,8 +528,8 @@ async fn sequence_entries(
543528
object: &impl ObjectBackend,
544529
lock: &impl LockBackend,
545530
cache: &mut impl CacheWrite,
546-
mut pending_entries: Vec<LogEntry>,
547-
done: Sender<SequenceMetadata>,
531+
pending_entries: Vec<PendingLogEntry>,
532+
callbacks: Vec<Sender<SequenceMetadata>>,
548533
metrics: &Metrics,
549534
) -> Result<(), SequenceError> {
550535
let start = now_millis();
@@ -572,13 +557,16 @@ async fn sequence_entries(
572557
}
573558
let mut overlay = HashMap::new();
574559
let mut n = old_size;
575-
let mut sequenced_leaves: Vec<LogEntry> = Vec::new();
560+
let mut sequenced_entries: Vec<LogEntry> = Vec::new();
576561

577-
for leaf in &mut pending_entries {
578-
leaf.leaf_index = n;
579-
leaf.timestamp = timestamp;
580-
sequenced_leaves.push(leaf.clone());
581-
let tile_leaf = leaf.tile_leaf();
562+
for entry in pending_entries {
563+
let sequenced_entry = LogEntry {
564+
inner: entry,
565+
leaf_index: n,
566+
timestamp,
567+
};
568+
let tile_leaf = sequenced_entry.tile_leaf();
569+
let merkle_tree_leaf = sequenced_entry.merkle_tree_leaf();
582570
metrics.seq_leaf_size.observe(tile_leaf.len().as_f64());
583571
data_tile.extend(tile_leaf);
584572

@@ -587,15 +575,15 @@ async fn sequence_entries(
587575
// the new tiles).
588576
let hashes = tlog_tiles::stored_hashes(
589577
n,
590-
&leaf.merkle_tree_leaf(),
578+
&merkle_tree_leaf,
591579
&HashReaderWithOverlay {
592580
edge_tiles: &edge_tiles,
593581
overlay: &overlay,
594582
},
595583
)
596584
.map_err(|e| {
597585
SequenceError::NonFatal(format!(
598-
"couldn't compute new hashes for leaf {leaf:?}: {e}"
586+
"couldn't compute new hashes for leaf {sequenced_entry:?}: {e}",
599587
))
600588
})?;
601589
for (i, h) in hashes.iter().enumerate() {
@@ -611,6 +599,8 @@ async fn sequence_entries(
611599
metrics.seq_data_tile_size.observe(data_tile.len().as_f64());
612600
data_tile.clear();
613601
}
602+
603+
sequenced_entries.push(sequenced_entry);
614604
}
615605

616606
// Stage leftover partial data tile, if any.
@@ -731,24 +721,31 @@ async fn sequence_entries(
731721

732722
// Return SCTs to clients. Clients can recover the leaf index
733723
// from the old tree size and their index in the sequenced pool.
734-
done.send_replace((old_size, timestamp));
724+
for (pool_index, entry) in sequenced_entries.iter().enumerate() {
725+
callbacks[pool_index].send_replace((entry.leaf_index, entry.timestamp));
726+
}
735727

736728
// At this point if the cache put fails, there's no reason to return errors to users. The
737729
// only consequence of cache false negatives are duplicated leaves anyway. In fact, an
738730
// error might cause the clients to resubmit, producing more cache false negatives and
739731
// duplicates.
740732
if let Err(e) = cache
741733
.put_entries(
742-
&sequenced_leaves
734+
&sequenced_entries
743735
.iter()
744-
.map(|entry| (entry.lookup_key(), (entry.leaf_index, entry.timestamp)))
736+
.map(|entry| {
737+
(
738+
entry.inner.lookup_key(),
739+
(entry.leaf_index, entry.timestamp),
740+
)
741+
})
745742
.collect::<Vec<_>>(),
746743
)
747744
.await
748745
{
749746
warn!(
750747
"{name}: Cache put failed (entries={}): {e}",
751-
sequenced_leaves.len()
748+
sequenced_entries.len()
752749
);
753750
}
754751

@@ -1130,7 +1127,7 @@ mod tests {
11301127
for i in 0..500_u64 {
11311128
for k in 0..3000_u64 {
11321129
let certificate = (i * 3000 + k).to_be_bytes().to_vec();
1133-
let leaf = LogEntry {
1130+
let leaf = PendingLogEntry {
11341131
certificate,
11351132
..Default::default()
11361133
};
@@ -1305,9 +1302,9 @@ mod tests {
13051302

13061303
// A pair of duplicates from the in_sequencing pool.
13071304
let res21 = log.add_with_seed(is_precert, 2); // 7
1308-
let (pending_entries, pending_done) = log.pool_state.take();
1305+
let (pending_entries, callbacks) = log.pool_state.take();
13091306
let res22 = log.add_with_seed(is_precert, 2);
1310-
log.sequence_finish(pending_entries, pending_done);
1307+
log.sequence_finish(pending_entries, callbacks);
13111308
let entry21 = block_on(res21.resolve()).unwrap();
13121309
let entry22 = block_on(res22.resolve()).unwrap();
13131310
assert_eq!(entry21, entry22);
@@ -1880,8 +1877,8 @@ mod tests {
18801877
}
18811878
fn sequence_finish(
18821879
&mut self,
1883-
pending_entries: Vec<LogEntry>,
1884-
pending_done: Sender<SequenceMetadata>,
1880+
pending_entries: Vec<PendingLogEntry>,
1881+
callbacks: Vec<Sender<SequenceMetadata>>,
18851882
) {
18861883
block_on(sequence_entries(
18871884
&mut self.sequence_state,
@@ -1890,7 +1887,7 @@ mod tests {
18901887
&self.lock,
18911888
&mut self.cache,
18921889
pending_entries,
1893-
pending_done,
1890+
callbacks,
18941891
&self.metrics,
18951892
))
18961893
.unwrap();
@@ -1919,14 +1916,12 @@ mod tests {
19191916
pre_certificate = Vec::new();
19201917
}
19211918
let issuers = CHAINS[rng.gen_range(0..CHAINS.len())];
1922-
let leaf = LogEntry {
1919+
let leaf = PendingLogEntry {
19231920
certificate,
19241921
pre_certificate,
19251922
is_precert,
19261923
issuer_key_hash,
19271924
chain_fingerprints: issuers.iter().map(|&x| Sha256::digest(x).into()).collect(),
1928-
leaf_index: 0,
1929-
timestamp: 0,
19301925
};
19311926

19321927
block_on(upload_issuers(&self.object, issuers, &self.config.name)).unwrap();
@@ -2020,16 +2015,16 @@ mod tests {
20202015
tlog_tiles::record_hash(&entry.merkle_tree_leaf())
20212016
);
20222017

2023-
assert!(!entry.certificate.is_empty());
2024-
if entry.is_precert {
2025-
assert!(!entry.pre_certificate.is_empty());
2026-
assert_ne!(entry.issuer_key_hash, [0; 32]);
2018+
assert!(!entry.inner.certificate.is_empty());
2019+
if entry.inner.is_precert {
2020+
assert!(!entry.inner.pre_certificate.is_empty());
2021+
assert_ne!(entry.inner.issuer_key_hash, [0; 32]);
20272022
} else {
2028-
assert!(entry.pre_certificate.is_empty());
2029-
assert_eq!(entry.issuer_key_hash, [0; 32]);
2023+
assert!(entry.inner.pre_certificate.is_empty());
2024+
assert_eq!(entry.inner.issuer_key_hash, [0; 32]);
20302025
}
20312026

2032-
for fp in entry.chain_fingerprints {
2027+
for fp in entry.inner.chain_fingerprints {
20332028
let b = block_on(self.object.fetch(&format!("issuer/{}", hex::encode(fp))))
20342029
.unwrap()
20352030
.unwrap();

0 commit comments

Comments
 (0)