Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions crates/ct_worker/config.schema.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": false,
"properties": {
"logging_level": {
"type": "string",
Expand All @@ -15,9 +16,11 @@
"logs": {
"type": "object",
"description": "Dictionary CT log shard names to configurations.",
"additionalProperties": false,
"patternProperties": {
"^[a-zA-Z0-9_]+$": {
"type": "object",
"additionalProperties": false,
"properties": {
"description": {
"type": "string",
Expand All @@ -43,6 +46,7 @@
},
"temporal_interval": {
"type": "object",
"additionalProperties": false,
"properties": {
"start_inclusive": {
"type": "string",
Expand All @@ -64,16 +68,45 @@
"type": "string",
"description": "Provide a hint to place the log in a specific geographic location. See https://developers.cloudflare.com/durable-objects/reference/data-location/ for supported locations. If unspecified, the Durable Object will be created in proximity to the first request."
},
"sequence_interval_seconds": {
"sequence_interval_millis": {
"type": "integer",
"minimum": 1,
"default": 1,
"description": "The duration in between sequencing operations, in seconds."
"minimum": 100,
"default": 1000,
"description": "The duration in between sequencing operations, in milliseconds."
},
"max_sequence_skips": {
"type": "integer",
"default": 0,
"description": "The maximum number of times sequencing can be skipped to avoid creating partial tiles. If non-zero, pending entries may be delayed by either a multiple of the sequence interval or sequence_skip_threshold_millis if set."
},
"max_pending_entry_holds": {
"sequence_skip_threshold_millis": {
"type": "integer",
"default": 1,
"description": "The maximum number of times a pending entry can be held back from sequencing to avoid creating partial tiles. If non-zero, pending entries may be delayed by a multiple of sequence interval."
"default": 0,
"description": "If non-zero, entries will only be skipped by sequencing (when max_sequenced_skips is non-zero) if they have been in the pool for less than this timeout."
},
"num_batchers": {
"type": "integer",
"minimum": 0,
"default": 8,
"maximum": 255,
"description": "The number of batchers to use to proxy requests to the sequencer. If zero, requests from the frontend worker go directly to the sequencer."
},
"batch_timeout_millis": {
"type": "integer",
"minimum": 100,
"default": 1000,
"description": "The maximum duration to wait before submitting a batch to the sequencer, in milliseconds."
},
"max_batch_entries": {
"type": "integer",
"minimum": 1,
"default": 1000,
"description": "The maximum number of entries per batch."
},
"disable_dedup": {
"type": "boolean",
"default": false,
"description": "Disable checking the deduplication cache for add-(pre-)chain requests (e.g., for tests or benchmarks)."
}
},
"required": [
Expand Down
37 changes: 29 additions & 8 deletions crates/ct_worker/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,37 @@ pub struct LogParams {
pub submission_url: String,
pub temporal_interval: TemporalInterval,
pub location_hint: Option<String>,
#[serde(default = "default_sequence_interval_seconds")]
pub sequence_interval_seconds: u64,
#[serde(default = "default_max_pending_entry_holds")]
pub max_pending_entry_holds: usize,
#[serde(default = "default_sequence_interval_millis")]
pub sequence_interval_millis: u64,
#[serde(default = "default_max_sequence_skips")]
pub max_sequence_skips: usize,
pub sequence_skip_threshold_millis: Option<u64>,
#[serde(default = "default_num_batchers")]
pub num_batchers: u8,
#[serde(default = "default_batch_timeout_millis")]
pub batch_timeout_millis: u64,
#[serde(default = "default_max_batch_entries")]
pub max_batch_entries: usize,
#[serde(default)]
pub disable_dedup: bool,
}

fn default_sequence_interval_millis() -> u64 {
1000
}

fn default_max_sequence_skips() -> usize {
0
}

fn default_num_batchers() -> u8 {
8
}

fn default_sequence_interval_seconds() -> u64 {
1
fn default_batch_timeout_millis() -> u64 {
1000
}

fn default_max_pending_entry_holds() -> usize {
1
fn default_max_batch_entries() -> usize {
100
}
38 changes: 14 additions & 24 deletions crates/ct_worker/src/batcher_do.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
//!
//! Entries are assigned to Batcher shards with consistent hashing on the cache key.

use crate::{get_stub, load_cache_kv, LookupKey, QueryParams, SequenceMetadata};
use crate::{
get_stub, load_cache_kv, LookupKey, QueryParams, SequenceMetadata, BATCH_ENDPOINT, CONFIG,
ENTRY_ENDPOINT,
};
use base64::prelude::*;
use futures_util::future::{join_all, select, Either};
use static_ct_api::{PendingLogEntryTrait, StaticCTPendingLogEntry};
Expand All @@ -18,15 +21,6 @@ use tokio::sync::watch::{self, Sender};
#[allow(clippy::wildcard_imports)]
use worker::*;

// How many in-flight requests to allow. Tune to prevent the DO from being overloaded.
const MAX_IN_FLIGHT: usize = 900;

// The maximum number of requests to submit together in a batch.
const MAX_BATCH_SIZE: usize = 100;

// The maximum amount of time to wait before submitting a batch.
const MAX_BATCH_TIMEOUT_MILLIS: u64 = 1_000;

struct GenericBatcher<E: PendingLogEntryTrait> {
env: Env,
batch: Batch<E>,
Expand All @@ -50,7 +44,7 @@ impl DurableObject for Batcher {

// A batch of entries to be submitted to the Sequencer together.
struct Batch<E: PendingLogEntryTrait> {
pending_leaves: Vec<E>,
entries: Vec<E>,
by_hash: HashSet<LookupKey>,
done: Sender<HashMap<LookupKey, SequenceMetadata>>,
}
Expand All @@ -60,7 +54,7 @@ impl<E: PendingLogEntryTrait> Default for Batch<E> {
fn default() -> Self {
let (done, _) = watch::channel(HashMap::new());
Self {
pending_leaves: Vec::new(),
entries: Vec::new(),
by_hash: HashSet::new(),
done,
}
Expand All @@ -78,34 +72,32 @@ impl<E: PendingLogEntryTrait> GenericBatcher<E> {
}
async fn fetch(&mut self, mut req: Request) -> Result<Response> {
match req.path().as_str() {
"/add_leaf" => {
ENTRY_ENDPOINT => {
let name = &req.query::<QueryParams>()?.name;
let params = &CONFIG.logs[name];
let entry: E = req.json().await?;
let key = entry.lookup_key();

if self.in_flight >= MAX_IN_FLIGHT {
return Response::error("too many requests in flight", 429);
}
self.in_flight += 1;
self.processed += 1;

// Add entry to the current pending batch if it isn't already present.
// Rely on the Sequencer to deduplicate entries across batches.
if !self.batch.by_hash.contains(&key) {
self.batch.by_hash.insert(key);
self.batch.pending_leaves.push(entry);
self.batch.entries.push(entry);
}

let mut recv = self.batch.done.subscribe();

// Submit the current pending batch if it's full.
if self.batch.pending_leaves.len() >= MAX_BATCH_SIZE {
if self.batch.entries.len() >= params.max_batch_entries {
if let Err(e) = self.submit_batch(name).await {
log::warn!("{name} failed to submit full batch: {e}");
}
} else {
let batch_done = recv.changed();
let timeout = Delay::from(Duration::from_millis(MAX_BATCH_TIMEOUT_MILLIS));
let timeout = Delay::from(Duration::from_millis(params.batch_timeout_millis));
futures_util::pin_mut!(batch_done);
match select(batch_done, timeout).await {
Either::Left((batch_done, _timeout)) => {
Expand Down Expand Up @@ -135,8 +127,6 @@ impl<E: PendingLogEntryTrait> GenericBatcher<E> {
} else {
// Failed to sequence this entry, either due to an error
// submitting the batch or rate limiting at the Sequencer.
// The entry's batch could have also been dropped before
// this fetch task woke up and received the channel update.
Response::error("rate limited", 429)
};
self.in_flight -= 1;
Expand All @@ -158,17 +148,17 @@ impl<E: PendingLogEntryTrait> GenericBatcher<E> {

log::debug!(
"{name} submitting batch: leaves={} inflight={} processed={}",
batch.pending_leaves.len(),
batch.entries.len(),
self.in_flight,
self.processed,
);

// Submit the batch, and wait for it to be sequenced.
let req = Request::new_with_init(
&format!("http://fake_url.com/add_batch?name={name}"),
&format!("http://fake_url.com{BATCH_ENDPOINT}?name={name}"),
&RequestInit {
method: Method::Post,
body: Some(serde_json::to_string(&batch.pending_leaves)?.into()),
body: Some(serde_json::to_string(&batch.entries)?.into()),
..Default::default()
},
)?;
Expand Down
Loading