Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,054 changes: 565 additions & 489 deletions Cargo.lock

Large diffs are not rendered by default.

38 changes: 33 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,42 @@ opt-level = 3
debug = 1

[workspace.dependencies]
anyhow = "1.0"
base64 = "0.22"
byteorder = "1.5"
chrono = { version = "0.4", features = ["serde"] }
console_error_panic_hook = "0.1.1"
console_log = { version = "1.0" }
criterion = { version = "0.5", features = ["html_reports"] }
der = "0.7.9"
ed25519-dalek = "2.1.1"
futures-executor = "0.3.31"
futures-util = "0.3.31"
getrandom = { version = "0.2", features = ["js"] }
hex = "0.4"
itertools = "0.14.0"
jsonschema = "0.30"
length_prefixed = { path = "crates/length_prefixed" }
libfuzzer-sys = "0.4"
log = { version = "0.4"}
p256 = { version = "0.13", features = ["ecdsa"] }
parking_lot = "0.12"
prometheus = "0.14"
rand = "0.8.5"
rand_core = "0.6.4"
serde = { version = "1.0", features = ["derive"] }
serde-wasm-bindgen = "0.6.5"
serde_bytes = "0.11"
serde_json = "1.0"
serde_with = { version = "3.9.0", features = ["base64"] }
sha2 = "0.10"
rand = "0.8.5"
signature = "2.2.0"
base64 = "0.21"
anyhow = "1.0"
ed25519-dalek = "2.1.1"
thiserror = "1.0"
signed_note = { path = "crates/signed_note", version = "0.2.0" }
static_ct_api = { path = "crates/static_ct_api", version = "0.2.0" }
thiserror = "2.0"
tlog_tiles = { path = "crates/tlog_tiles", version = "0.2.0" }
tokio = { version = "1", features = ["sync"] }
url = "2.2"
worker = "0.5.0"
x509-verify = { version = "0.4.4", features = ["md2", "md5", "sha1", "dsa", "rsa", "k256", "p192", "p224", "p256", "p384", "ecdsa", "ed25519", "x509", "pem"] }
x509_util = { path = "crates/x509_util" }
54 changes: 28 additions & 26 deletions crates/ct_worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,47 +27,49 @@ wasm-opt = false
crate-type = ["cdylib"]

[build-dependencies]
serde.workspace = true
serde_json.workspace = true
jsonschema = "0.26.2"
static_ct_api = "0.2.0"
config = { path = "./config", package = "ct_worker_config" }
chrono.workspace = true
url = "2.5.4"
config = { path = "./config", package = "ct_worker_config" }
jsonschema.workspace = true
serde_json.workspace = true
serde.workspace = true
url.workspace = true
x509-verify.workspace = true

[dev-dependencies]
rand = { workspace = true, features = ["small_rng"]}
itertools = "0.13.0"
parking_lot = "0.11"
futures-executor = "0.3.31"
itertools.workspace = true
parking_lot.workspace = true
futures-executor.workspace = true

[dependencies]
anyhow.workspace = true
base64.workspace = true
byteorder = "1.4"
byteorder.workspace = true
config = { path = "./config", package = "ct_worker_config" }
console_error_panic_hook = "0.1.1"
console_log = { version = "1.0" }
ed25519-dalek = { workspace = true, features = ["pkcs8"] }
getrandom = { version = "0.2", features = ["js"] }
hex = "0.4"
log = { version = "0.4"}
signed_note = "0.2.0"
prometheus = "0.13.4"
p256 = { version = "0.13", features = ["ecdsa"] }
console_error_panic_hook.workspace = true
console_log.workspace = true
ed25519-dalek.workspace = true
futures-util.workspace = true
getrandom.workspace = true
hex.workspace = true
log.workspace = true
p256.workspace = true
prometheus.workspace = true
rand.workspace = true
serde-wasm-bindgen.workspace = true
serde.workspace = true
serde_bytes = "0.11"
serde_bytes.workspace = true
serde_json.workspace = true
serde_with.workspace = true
serde-wasm-bindgen = "0.6.5"
sha2.workspace = true
static_ct_api = "0.2.0"
signed_note.workspace = true
static_ct_api.workspace = true
thiserror.workspace = true
tlog_tiles = "0.2.0"
tokio = { version = "1", features = ["sync"] }
worker = { version = "0.5.0" }
futures-util = "0.3.31"
tlog_tiles.workspace = true
tokio.workspace = true
worker.workspace = true
x509-verify.workspace = true
x509_util.workspace = true

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(wasm_bindgen_unstable_test_coverage)'] }
Expand Down
3 changes: 2 additions & 1 deletion crates/ct_worker/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use serde_json::from_str;
use std::env;
use std::fs;
use url::Url;
use x509_verify::x509_cert::Certificate;

fn main() {
let env = env::var("DEPLOY_ENV").unwrap_or_else(|_| "dev".to_string());
Expand Down Expand Up @@ -64,7 +65,7 @@ fn main() {
roots_file = "default_roots.pem";
}
let roots =
static_ct_api::load_pem_chain(&fs::read(roots_file).expect("failed to read roots file"))
Certificate::load_pem_chain(&fs::read(roots_file).expect("failed to read roots file"))
.expect("unable to decode certificates");
assert!(!roots.is_empty(), "Roots file is empty");

Expand Down
8 changes: 4 additions & 4 deletions crates/ct_worker/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ pub struct LogParams {
pub submission_url: String,
pub temporal_interval: TemporalInterval,
pub location_hint: Option<String>,
#[serde(default = "default_pool_size")]
#[serde(default = "default_pool_size_seconds")]
pub pool_size: usize,
#[serde(default = "default_sequence_interval")]
#[serde(default = "default_sequence_interval_seconds")]
pub sequence_interval: u64,
}

// Limit on the number of entries per batch. Tune this parameter to avoid running into various size limitations.
// For instance, unexpectedly large leaves (e.g., with PQ signatures) could cause us to exceed the 128MB Workers memory limit. Storing 4000 10KB certificates is 40MB.
fn default_pool_size() -> usize {
fn default_pool_size_seconds() -> usize {
4000
}

fn default_sequence_interval() -> u64 {
fn default_sequence_interval_seconds() -> u64 {
1
}
18 changes: 7 additions & 11 deletions crates/ct_worker/src/batcher_do.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! Entries are assigned to Batcher shards with consistent hashing on the cache key.

use crate::{ctlog, get_stub, load_cache_kv, CacheKey, CacheValue, QueryParams};
use crate::{get_stub, load_cache_kv, LookupKey, QueryParams, SequenceMetadata};
use base64::prelude::*;
use futures_util::future::{join_all, select, Either};
use static_ct_api::LogEntry;
Expand Down Expand Up @@ -38,8 +38,8 @@ struct Batcher {
// A batch of entries to be submitted to the Sequencer together.
struct Batch {
pending_leaves: Vec<LogEntry>,
by_hash: HashSet<CacheKey>,
done: Sender<HashMap<CacheKey, CacheValue>>,
by_hash: HashSet<LookupKey>,
done: Sender<HashMap<LookupKey, SequenceMetadata>>,
}

impl Default for Batch {
Expand Down Expand Up @@ -69,11 +69,7 @@ impl DurableObject for Batcher {
"/add_leaf" => {
let name = &req.query::<QueryParams>()?.name;
let entry: LogEntry = req.json().await?;
let key = ctlog::compute_cache_hash(
entry.is_precert,
&entry.certificate,
&entry.issuer_key_hash,
);
let key = entry.lookup_key();

if self.in_flight >= MAX_IN_FLIGHT {
return Response::error("too many requests in flight", 429);
Expand Down Expand Up @@ -164,10 +160,10 @@ impl Batcher {
..Default::default()
},
)?;
let sequenced_entries: HashMap<CacheKey, CacheValue> = stub
let sequenced_entries: HashMap<LookupKey, SequenceMetadata> = stub
.fetch_with_request(req)
.await?
.json::<Vec<(CacheKey, CacheValue)>>()
.json::<Vec<(LookupKey, SequenceMetadata)>>()
.await?
.into_iter()
.collect();
Expand All @@ -182,7 +178,7 @@ impl Batcher {
.map(|(k, v)| {
kv.put(&BASE64_STANDARD.encode(k), "")
.unwrap()
.metadata::<CacheValue>(v)
.metadata::<SequenceMetadata>(v)
.unwrap()
.execute()
})
Expand Down
84 changes: 18 additions & 66 deletions crates/ct_worker/src/ctlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@
//! - [testlog_test.go](https://github.com/FiloSottile/sunlight/blob/36be227ff4599ac11afe3cec37a5febcd61da16a/internal/ctlog/testlog_test.go)

use crate::{
ctlog,
metrics::{millis_diff_as_secs, AsF64, Metrics},
util::now_millis,
CacheKey, CacheRead, CacheValue, CacheWrite, LockBackend, ObjectBackend,
CacheRead, CacheWrite, LockBackend, LookupKey, ObjectBackend, SequenceMetadata,
};
use anyhow::{anyhow, bail};
use byteorder::{BigEndian, WriteBytesExt};
use ed25519_dalek::SigningKey as Ed25519SigningKey;
use futures_util::future::try_join_all;
use log::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -75,8 +73,8 @@ pub(crate) struct PoolState {
// in_sequencing is the [Pool::by_hash] map of the pool that's currently being
// sequenced. These entries might not be sequenced yet or might not yet be
// committed to the deduplication cache.
in_sequencing: HashMap<CacheKey, u64>,
in_sequencing_done: Option<Receiver<CacheValue>>,
in_sequencing: HashMap<LookupKey, u64>,
in_sequencing_done: Option<Receiver<SequenceMetadata>>,
}

// State owned by the sequencing loop.
Expand Down Expand Up @@ -285,15 +283,15 @@ impl SequenceState {
/// Result of an [`add_leaf_to_pool`] request containing either a cached log
/// entry or a pending entry that must be resolved.
pub(crate) enum AddLeafResult {
Cached(CacheValue),
Pending((u64, Receiver<CacheValue>)),
Cached(SequenceMetadata),
Pending((u64, Receiver<SequenceMetadata>)),
RateLimited,
}

impl AddLeafResult {
/// Resolve an `AddLeafResult` to a leaf entry, or None if the
/// entry was not sequenced.
pub(crate) async fn resolve(self) -> Option<CacheValue> {
pub(crate) async fn resolve(self) -> Option<SequenceMetadata> {
match self {
AddLeafResult::Cached(entry) => Some(entry),
AddLeafResult::Pending((pool_index, mut rx)) => {
Expand Down Expand Up @@ -342,9 +340,9 @@ pub(crate) fn add_leaf_to_pool(
cache: &impl CacheRead,
leaf: &LogEntry,
) -> (AddLeafResult, AddLeafResultSource) {
let hash = compute_cache_hash(leaf.is_precert, &leaf.certificate, &leaf.issuer_key_hash);
let hash = leaf.lookup_key();
let pool_index: u64;
let rx: Receiver<CacheValue>;
let rx: Receiver<SequenceMetadata>;
let source: AddLeafResultSource;

if let Some(index) = state.in_sequencing.get(&hash) {
Expand Down Expand Up @@ -677,16 +675,7 @@ async fn sequence_pool(
.put_entries(
&sequenced_leaves
.iter()
.map(|entry| {
(
ctlog::compute_cache_hash(
entry.is_precert,
&entry.certificate,
&entry.issuer_key_hash,
),
(entry.leaf_index, entry.timestamp),
)
})
.map(|entry| (entry.lookup_key(), (entry.leaf_index, entry.timestamp)))
.collect::<Vec<_>>(),
)
.await
Expand Down Expand Up @@ -968,10 +957,10 @@ impl HashReader for HashReaderWithOverlay<'_> {
#[derive(Debug)]
struct Pool {
pending_leaves: Vec<LogEntry>,
by_hash: HashMap<CacheKey, u64>,
by_hash: HashMap<LookupKey, u64>,
// Sends the index of the first sequenced entry in the pool,
// and the pool's sequencing timestamp.
done: Sender<CacheValue>,
done: Sender<SequenceMetadata>,
}

impl Default for Pool {
Expand All @@ -986,46 +975,6 @@ impl Default for Pool {
}
}

/// Compute the cache key for a log entry.
pub(crate) fn compute_cache_hash(
is_precert: bool,
certificate: &[u8],
issuer_key_hash: &[u8; 32],
) -> CacheKey {
let mut buffer = Vec::new();
if is_precert {
// Add entry type = 1 (precert_entry)
buffer.write_u16::<BigEndian>(1).unwrap();

// Add issuer key hash
buffer.extend_from_slice(issuer_key_hash);

// Add certificate with a 24-bit length prefix
buffer
.write_uint::<BigEndian>(certificate.len() as u64, 3)
.unwrap();
buffer.extend_from_slice(certificate);
} else {
// Add entry type = 0 (x509_entry)
buffer.write_u16::<BigEndian>(0).unwrap();

// Add certificate with a 24-bit length prefix
buffer
.write_uint::<BigEndian>(certificate.len() as u64, 3)
.unwrap();
buffer.extend_from_slice(certificate);
}

// Compute the SHA-256 hash of the buffer
let hash = Sha256::digest(&buffer);

// Return the first 16 bytes of the hash as the cacheHash
let mut cache_hash = [0u8; 16];
cache_hash.copy_from_slice(&hash[..16]);

cache_hash
}

/// A pending upload.
#[derive(Debug, Serialize, Deserialize)]
struct UploadAction {
Expand Down Expand Up @@ -1093,7 +1042,7 @@ fn staging_path(mut tree_size: u64, tree_hash: &Hash) -> String {
#[cfg(test)]
mod tests {
use super::*;
use crate::{util, CacheKey, CacheValue};
use crate::{util, LookupKey, SequenceMetadata};
use futures_executor::block_on;
use itertools::Itertools;
use rand::{
Expand Down Expand Up @@ -1747,16 +1696,19 @@ mod tests {
}
}

struct TestCacheBackend(HashMap<CacheKey, CacheValue>);
struct TestCacheBackend(HashMap<LookupKey, SequenceMetadata>);

impl CacheRead for TestCacheBackend {
fn get_entry(&self, key: &CacheKey) -> Option<CacheValue> {
fn get_entry(&self, key: &LookupKey) -> Option<SequenceMetadata> {
self.0.get(key).copied()
}
}

impl CacheWrite for TestCacheBackend {
async fn put_entries(&mut self, entries: &[(CacheKey, CacheValue)]) -> worker::Result<()> {
async fn put_entries(
&mut self,
entries: &[(LookupKey, SequenceMetadata)],
) -> worker::Result<()> {
for (key, value) in entries {
if self.0.contains_key(key) {
continue;
Expand Down
Loading