Skip to content
166 changes: 86 additions & 80 deletions crates/ct_worker/src/ctlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::{
};
use thiserror::Error;
use tlog_tiles::{Hash, HashReader, PathElem, Tile, TlogError, TlogTile, HASH_SIZE};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::watch::{Receiver, Sender};

/// The maximum tile level is 63 (<c2sp.org/static-ct-api>), so safe to use [`u8::MAX`] as
/// the special level for data tiles. The Go implementation uses -1.
Expand All @@ -63,33 +63,10 @@ pub(crate) struct LogConfig {
pub(crate) sequence_interval: Duration,
}

/// A pool of pending log entries that are sequenced together. Clients subscribe
/// to pools to learn when their submitted entries have been processed.
#[derive(Debug)]
struct Pool {
pending_leaves: Vec<LogEntry>,
by_hash: HashMap<LookupKey, (u64, Receiver<SequenceMetadata>)>,
// Sends the index of the first sequenced entry in the pool,
// and the pool's sequencing timestamp.
done: Sender<SequenceMetadata>,
}

impl Default for Pool {
/// Returns a pool initialized with a watch channel.
fn default() -> Self {
let (tx, _) = watch::channel((0, 0));
Self {
pending_leaves: vec![],
by_hash: HashMap::new(),
done: tx,
}
}
}

/// Ephemeral state for pooling entries to the CT log.
///
/// The pool is written to by `add_leaf_to_pool`, and by the sequencer
/// when rotating `current_pool` and `in_sequencing`.
/// when rotating `pending` and `in_sequencing`.
///
/// As long as the above-mentioned blocks run synchronously (no 'await's), Durable Objects'
/// single-threaded execution guarantees that `add_leaf_to_pool` will never add to a pool that
Expand All @@ -98,10 +75,17 @@ impl Default for Pool {
/// <https://blog.cloudflare.com/durable-objects-easy-fast-correct-choose-three/#background-durable-objects-are-single-threaded>
#[derive(Default, Debug)]
pub(crate) struct PoolState {
current_pool: Pool,
// in_sequencing is the [Pool::by_hash] map of the pool that's currently being
// sequenced. These entries might not be sequenced yet or might not yet be
// committed to the deduplication cache.
// Entries that are ready to be sequenced.
pending_entries: Vec<LogEntry>,

// Deduplication cache for entries currently pending sequencing.
pending: HashMap<LookupKey, (u64, Receiver<SequenceMetadata>)>,

// Channel that will be updated when the current pool of pending entries is
// sequenced.
pending_done: Sender<SequenceMetadata>,

// Deduplication cache for entries currently being sequenced.
in_sequencing: HashMap<LookupKey, (u64, Receiver<SequenceMetadata>)>,
}

Expand All @@ -118,8 +102,7 @@ impl PoolState {
source: PendingSource::InSequencing,
})
} else {
self.current_pool
.by_hash
self.pending
.get(key)
.map(|(index, rx)| AddLeafResult::Pending {
pool_index: *index,
Expand All @@ -130,22 +113,34 @@ impl PoolState {
}
// Add a new entry to the pool.
fn add(&mut self, key: LookupKey, leaf: &LogEntry) -> AddLeafResult {
if self.pending_leaves.len() >= MAX_POOL_SIZE {
if self.pending_entries.len() >= MAX_POOL_SIZE {
return AddLeafResult::RateLimited;
}
self.current_pool.pending_leaves.push(leaf.clone());
let pool_index = (self.current_pool.pending_leaves.len() as u64) - 1;
let rx = self.current_pool.done.subscribe();
self.current_pool
.by_hash
.insert(key, (pool_index, rx.clone()));
self.pending_entries.push(leaf.clone());
let pool_index = (self.pending_entries.len() as u64) - 1;
let rx = self.pending_done.subscribe();
self.pending.insert(key, (pool_index, rx.clone()));

AddLeafResult::Pending {
pool_index,
rx,
source: PendingSource::Sequencer,
}
}
// Take the entries from the pool that are ready to be sequenced and the
// corresponding Senders to update when the entries have been sequenced.
fn take(&mut self) -> (Vec<LogEntry>, Sender<SequenceMetadata>) {
self.in_sequencing = std::mem::take(&mut self.pending);
(
std::mem::take(&mut self.pending_entries),
std::mem::take(&mut self.pending_done),
)
}
// Reset the map of in-sequencing entries. This should be called after
// sequencing completes.
fn reset(&mut self) {
self.in_sequencing.clear();
}
}

// State owned by the sequencing loop.
Expand Down Expand Up @@ -482,38 +477,47 @@ pub(crate) async fn sequence(
cache: &mut impl CacheWrite,
metrics: &Metrics,
) -> Result<(), anyhow::Error> {
let mut p = std::mem::take(&mut pool_state.current_pool);
pool_state.in_sequencing = std::mem::take(&mut p.by_hash);
let (pending_entries, pending_done) = pool_state.take();

metrics
.seq_pool_size
.observe(p.pending_leaves.len().as_f64());
.observe(pending_entries.len().as_f64());

let result =
match sequence_pool(sequence_state, config, object, lock, cache, &mut p, metrics).await {
Ok(()) => {
metrics.seq_count.with_label_values(&[""]).inc();
Ok(())
}
Err(SequenceError::Fatal(e)) => {
// Clear ephemeral sequencing state, as it may no longer be valid.
// It will be loaded again the next time sequence_pool is called.
metrics.seq_count.with_label_values(&["fatal"]).inc();
error!("{}: Fatal sequencing error {e}", config.name);
*sequence_state = None;
Err(anyhow!(e))
}
Err(SequenceError::NonFatal(e)) => {
metrics.seq_count.with_label_values(&["non-fatal"]).inc();
error!("{}: Non-fatal sequencing error {e}", config.name);
Ok(())
}
};
let result = match sequence_entries(
sequence_state,
config,
object,
lock,
cache,
pending_entries,
pending_done,
metrics,
)
.await
{
Ok(()) => {
metrics.seq_count.with_label_values(&[""]).inc();
Ok(())
}
Err(SequenceError::Fatal(e)) => {
// Clear ephemeral sequencing state, as it may no longer be valid.
// It will be loaded again the next time sequence_entries is called.
metrics.seq_count.with_label_values(&["fatal"]).inc();
error!("{}: Fatal sequencing error {e}", config.name);
*sequence_state = None;
Err(anyhow!(e))
}
Err(SequenceError::NonFatal(e)) => {
metrics.seq_count.with_label_values(&["non-fatal"]).inc();
error!("{}: Non-fatal sequencing error {e}", config.name);
Ok(())
}
};

// Once [sequence_pool] returns, the entries are either in the deduplication
// Once [sequence_entries] returns, the entries are either in the deduplication
// cache or finalized with an error. In the latter case, we don't want
// a resubmit to deduplicate against the failed sequencing.
pool_state.in_sequencing.clear();
pool_state.reset();

result
}
Expand All @@ -532,13 +536,15 @@ enum SequenceError {
/// If a non-fatal sequencing error occurs, pending requests will receive an error but the log will continue as normal.
/// If a fatal sequencing error occurs, the ephemeral log state must be reloaded before the next sequencing.
#[allow(clippy::too_many_lines)]
async fn sequence_pool(
#[allow(clippy::too_many_arguments)]
async fn sequence_entries(
sequence_state: &mut Option<SequenceState>,
config: &LogConfig,
object: &impl ObjectBackend,
lock: &impl LockBackend,
cache: &mut impl CacheWrite,
p: &mut Pool,
mut pending_entries: Vec<LogEntry>,
done: Sender<SequenceMetadata>,
metrics: &Metrics,
) -> Result<(), SequenceError> {
let start = now_millis();
Expand Down Expand Up @@ -568,7 +574,7 @@ async fn sequence_pool(
let mut n = old_size;
let mut sequenced_leaves: Vec<LogEntry> = Vec::new();

for leaf in &mut p.pending_leaves {
for leaf in &mut pending_entries {
leaf.leaf_index = n;
leaf.timestamp = timestamp;
sequenced_leaves.push(leaf.clone());
Expand Down Expand Up @@ -725,7 +731,7 @@ async fn sequence_pool(

// Return SCTs to clients. Clients can recover the leaf index
// from the old tree size and their index in the sequenced pool.
p.done.send_replace((old_size, timestamp));
done.send_replace((old_size, timestamp));

// At this point if the cache put fails, there's no reason to return errors to users. The
// only consequence of cache false negatives are duplicated leaves anyway. In fact, an
Expand Down Expand Up @@ -770,7 +776,7 @@ async fn sequence_pool(
Ok(())
}

// Stage a data tile. This is used as a helper function for [`sequence_pool`].
// Stage a data tile. This is used as a helper function for [`sequence_entries`].
fn stage_data_tile(
n: u64,
edge_tiles: &mut HashMap<u8, TileWithBytes>,
Expand Down Expand Up @@ -1274,7 +1280,7 @@ mod tests {
log.add_with_seed(is_precert, rand::thread_rng().next_u64()); // 3
log.add_with_seed(is_precert, rand::thread_rng().next_u64()); // 4

// Two pairs of duplicates from the by_hash pool.
// Two pairs of duplicates from the pending pool.
let res01 = log.add_with_seed(is_precert, 0); // 5
let res02 = log.add_with_seed(is_precert, 0);
let res11 = log.add_with_seed(is_precert, 1); // 6
Expand All @@ -1299,9 +1305,9 @@ mod tests {

// A pair of duplicates from the in_sequencing pool.
let res21 = log.add_with_seed(is_precert, 2); // 7
let mut p = log.sequence_start();
let (pending_entries, pending_done) = log.pool_state.take();
let res22 = log.add_with_seed(is_precert, 2);
log.sequence_finish(&mut p);
log.sequence_finish(pending_entries, pending_done);
let entry21 = block_on(res21.resolve()).unwrap();
let entry22 = block_on(res22.resolve()).unwrap();
assert_eq!(entry21, entry22);
Expand Down Expand Up @@ -1872,23 +1878,23 @@ mod tests {
&self.metrics,
))
}
fn sequence_start(&mut self) -> Pool {
let mut p = std::mem::take(&mut self.pool_state.current_pool);
self.pool_state.in_sequencing = std::mem::take(&mut p.by_hash);
p
}
fn sequence_finish(&mut self, p: &mut Pool) {
block_on(sequence_pool(
fn sequence_finish(
&mut self,
pending_entries: Vec<LogEntry>,
pending_done: Sender<SequenceMetadata>,
) {
block_on(sequence_entries(
&mut self.sequence_state,
&self.config,
&self.object,
&self.lock,
&mut self.cache,
p,
pending_entries,
pending_done,
&self.metrics,
))
.unwrap();
self.pool_state.in_sequencing.clear();
self.pool_state.reset();
}
fn add_certificate(&mut self) -> AddLeafResult {
self.add_certificate_with_seed(rand::thread_rng().next_u64())
Expand Down