Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2404fbd
Made sequencer and batcher generic over the underlying log entry type…
May 21, 2025
0479622
Added a JsonSerialize trait so that PendingLogEntry doesn't have to i…
May 21, 2025
1130b6f
Revert "Added a JsonSerialize trait so that PendingLogEntry doesn't h…
May 21, 2025
971a9d1
Made Sequencer and Batcher generic, and made the ctlog DO impls of th…
May 22, 2025
44d306b
Made TileIterator generic
May 24, 2025
4424f3e
Made PendingLogEntryTrait an associated type of LogEntryTrait
May 24, 2025
97ae66e
Fix off-by-one error introduced in 073266a
May 24, 2025
37500bf
Started making open_checkpoint generic over verifiers
May 29, 2025
8d4dfd5
Removed old debugging statements
May 29, 2025
a7c9d4e
Replaced static-ct NoteSigner impls with CheckpointSigner; made TreeW…
May 30, 2025
2231fdb
Fixed doctests
May 30, 2025
d09cdd0
Made LogConfig hold CheckpointSigner rather than keys directly
May 30, 2025
daed56c
Renamed LogEntry/PendingLogEntry -> StaticCT*
May 30, 2025
51c96c7
Made the key loading function an input to Sequencer
Jun 2, 2025
f636a68
Renamed CtLogSequencer -> StaticCTSequencer
Jun 2, 2025
4f3be6c
Fix breakage from rebases
lukevalenta Jun 3, 2025
b746bd5
Fix linter complaints (clippy::pedantic)
lukevalenta Jun 3, 2025
db587a6
Add VerificationError to StaticCTError enum
lukevalenta Jun 3, 2025
62d473e
Use SequenceMetadata more consistently and fix other small nits
lukevalenta Jun 3, 2025
1519c64
Keep Sequencer and Batcher names for DOs to avoid changes in wrangler…
lukevalenta Jun 3, 2025
bbb3fd6
Clean up additional nits
lukevalenta Jun 3, 2025
6dd008a
Use UnixTimestamp from tlog_tiles crate within static_ct_api
lukevalenta Jun 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 24 additions & 12 deletions crates/ct_worker/src/batcher_do.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use crate::{get_stub, load_cache_kv, LookupKey, QueryParams, SequenceMetadata};
use base64::prelude::*;
use futures_util::future::{join_all, select, Either};
use static_ct_api::PendingLogEntry;
use static_ct_api::{StaticCTPendingLogEntry, PendingLogEntryTrait};
use std::{
collections::{HashMap, HashSet},
time::Duration,
Expand All @@ -27,22 +27,35 @@ const MAX_BATCH_SIZE: usize = 100;
// The maximum amount of time to wait before submitting a batch.
const MAX_BATCH_TIMEOUT_MILLIS: u64 = 1_000;

#[durable_object]
struct Batcher {
struct Batcher<E: PendingLogEntryTrait> {
env: Env,
batch: Batch,
batch: Batch<E>,
in_flight: usize,
processed: usize,
}

#[durable_object]
struct CtLogBatcher(Batcher<StaticCTPendingLogEntry>);

#[durable_object]
impl DurableObject for CtLogBatcher {
fn new(state: State, env: Env) -> Self {
CtLogBatcher(Batcher::new(state, env))
}

async fn fetch(&mut self, req: Request) -> Result<Response> {
self.0.fetch(req).await
}
}

// A batch of entries to be submitted to the Sequencer together.
struct Batch {
pending_leaves: Vec<PendingLogEntry>,
struct Batch<E: PendingLogEntryTrait> {
pending_leaves: Vec<E>,
by_hash: HashSet<LookupKey>,
done: Sender<HashMap<LookupKey, SequenceMetadata>>,
}

impl Default for Batch {
impl<E: PendingLogEntryTrait> Default for Batch<E> {
/// Returns a batch initialized with a watch channel.
fn default() -> Self {
let (done, _) = watch::channel(HashMap::new());
Expand All @@ -54,9 +67,8 @@ impl Default for Batch {
}
}

#[durable_object]
impl DurableObject for Batcher {
fn new(state: State, env: Env) -> Self {
impl<E: PendingLogEntryTrait> Batcher<E> {
fn new(_: State, env: Env) -> Self {
Self {
env,
batch: Batch::default(),
Expand All @@ -68,7 +80,7 @@ impl DurableObject for Batcher {
match req.path().as_str() {
"/add_leaf" => {
let name = &req.query::<QueryParams>()?.name;
let entry: PendingLogEntry = req.json().await?;
let entry: E = req.json().await?;
let key = entry.lookup_key();

if self.in_flight >= MAX_IN_FLIGHT {
Expand Down Expand Up @@ -136,7 +148,7 @@ impl DurableObject for Batcher {
}
}

impl Batcher {
impl<E: PendingLogEntryTrait> Batcher<E> {
// Submit the current pending batch to be sequenced.
async fn submit_batch(&mut self, name: &str) -> Result<()> {
let stub = get_stub(&self.env, name, None, "SEQUENCER")?;
Expand Down
Loading