Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/ct_worker/src/batcher_do.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl DurableObject for Batcher {
enable_dedup: params.enable_dedup,
location_hint: params.location_hint.clone(),
};
Batcher(GenericBatcher::new(env, config))
Batcher(GenericBatcher::new(state, env, config))
}

async fn fetch(&self, req: Request) -> Result<Response> {
Expand Down
2 changes: 1 addition & 1 deletion crates/ct_worker/src/cleaner_do.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl DurableObject for Cleaner {
clean_interval: Duration::from_secs(params.clean_interval_secs),
};

Cleaner(GenericCleaner::new(&state, &env, config))
Cleaner(GenericCleaner::new(state, &env, config))
}

async fn fetch(&self, req: Request) -> Result<Response> {
Expand Down
27 changes: 10 additions & 17 deletions crates/ct_worker/src/frontend_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::{load_roots, load_signing_key, SequenceMetadata, CONFIG};
use config::TemporalInterval;
use generic_log_worker::{
batcher_id_from_lookup_key, deserialize, get_cached_metadata, get_durable_object_stub,
init_logging, load_cache_kv, load_public_bucket, put_cache_entry_metadata, serialize,
ObjectBucket, ENTRY_ENDPOINT, METRICS_ENDPOINT,
init_logging, load_cache_kv, load_public_bucket, obs::Wshim, put_cache_entry_metadata,
serialize, ObjectBucket, ENTRY_ENDPOINT,
};
use p256::pkcs8::EncodePublicKey;
use serde::Serialize;
Expand Down Expand Up @@ -62,9 +62,10 @@ fn start() {
///
/// Panics if there are issues parsing route parameters, which should never happen.
#[event(fetch, respond_with_errors)]
async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
let wshim = Wshim::from_env(&env);
// Use an outer router as middleware to check that the log name is valid.
Router::new()
let response = Router::new()
.or_else_any_method_async("/logs/:log/*route", |req, ctx| async move {
let name = if let Some(name) = ctx.param("log") {
if CONFIG.logs.contains_key(name) {
Expand Down Expand Up @@ -117,18 +118,6 @@ async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
temporal_interval: &params.temporal_interval,
})
})
.get_async("/logs/:log/metrics", |_req, ctx| async move {
let name = ctx.data;
let stub = get_durable_object_stub(
&ctx.env,
name,
None,
"SEQUENCER",
CONFIG.logs[name].location_hint.as_deref(),
)?;
stub.fetch_with_str(&format!("http://fake_url.com{METRICS_ENDPOINT}"))
.await
})
.get("/logs/:log/sequencer_id", |_req, ctx| {
// Print out the Durable Object ID of the sequencer to allow
// looking it up in internal Cloudflare dashboards. This
Expand Down Expand Up @@ -181,7 +170,11 @@ async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
log::warn!("Internal error: {e}");
Response::error("Internal error", 500)
}
})
});
if let Ok(wshim) = wshim {
ctx.wait_until(async move { wshim.flush(&generic_log_worker::obs::logs::LOGGER).await });
}
response
}

#[allow(clippy::too_many_lines)]
Expand Down
6 changes: 3 additions & 3 deletions crates/ct_worker/wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"build": {
// Change '--release' to '--dev' to compile with debug symbols.
// DEPLOY_ENV is used in build.rs to select per-environment config and roots.
"command": "cargo install -q worker-build && DEPLOY_ENV=dev worker-build --release"
"command": "cargo install -q worker-build@0.1.14 && DEPLOY_ENV=dev worker-build --release"
},
"route": {
"pattern": "static-ct-dev.cloudflareresearch.com",
Expand Down Expand Up @@ -117,7 +117,7 @@
"build": {
// Change '--release' to '--dev' to compile with debug symbols.
// DEPLOY_ENV is used in build.rs to select per-environment config and roots.
"command": "cargo install -q worker-build && DEPLOY_ENV=cftest worker-build --release"
"command": "cargo install -q worker-build@0.1.14 && DEPLOY_ENV=cftest worker-build --release"
},
"route": {
"pattern": "static-ct.cloudflareresearch.com",
Expand Down Expand Up @@ -185,7 +185,7 @@
"build": {
// Change '--release' to '--dev' to compile with debug symbols.
// DEPLOY_ENV is used in build.rs to select per-environment config and roots.
"command": "cargo install -q worker-build && DEPLOY_ENV=raio worker-build --release"
"command": "cargo install -q worker-build@0.1.14 && DEPLOY_ENV=raio worker-build --release"
},
"routes": [
"https://ct.cloudflare.com/logs/raio2025h2b/*",
Expand Down
3 changes: 2 additions & 1 deletion crates/generic_log_worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ console_log.workspace = true
ed25519-dalek.workspace = true
futures-util.workspace = true
hex.workspace = true
log.workspace = true
log = { workspace = true, features = ["kv"] }
p256.workspace = true
prometheus.workspace = true
rand.workspace = true
serde-wasm-bindgen.workspace = true
serde.workspace = true
serde_bytes.workspace = true
serde_json.workspace = true
sha2.workspace = true
signed_note.workspace = true
thiserror.workspace = true
Expand Down
31 changes: 27 additions & 4 deletions crates/generic_log_worker/src/batcher_do.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
//! Entries are assigned to Batcher shards with consistent hashing on the cache key.

use crate::{
deserialize, get_durable_object_stub, load_cache_kv, serialize, LookupKey, SequenceMetadata,
BATCH_ENDPOINT, ENTRY_ENDPOINT, SEQUENCER_BINDING,
deserialize, get_durable_object_stub, load_cache_kv, obs, serialize, LookupKey,
SequenceMetadata, BATCH_ENDPOINT, ENTRY_ENDPOINT, SEQUENCER_BINDING,
};
use base64::prelude::*;
use futures_util::future::{join_all, select, Either};
Expand All @@ -26,10 +26,12 @@ use worker::*;
pub struct GenericBatcher {
env: Env,
config: BatcherConfig,
state: State,
kv: Option<KvStore>,
batch: RefCell<Batch>,
in_flight: RefCell<usize>,
processed: RefCell<usize>,
wshim: Option<obs::Wshim>,
}

pub struct BatcherConfig {
Expand Down Expand Up @@ -64,20 +66,24 @@ impl GenericBatcher {
/// # Panics
///
/// Panics if we can't get a handle for the sequencer or KV store.
pub fn new(env: Env, config: BatcherConfig) -> Self {
pub fn new(state: State, env: Env, config: BatcherConfig) -> Self {
let kv = if config.enable_dedup {
Some(load_cache_kv(&env, &config.name).unwrap())
} else {
None
};

let wshim = obs::Wshim::from_env(&env).ok();

Self {
env,
config,
state,
kv,
batch: RefCell::new(Batch::default()),
in_flight: RefCell::new(0),
processed: RefCell::new(0),
wshim,
}
}

Expand All @@ -89,7 +95,11 @@ impl GenericBatcher {
///
/// Returns an error if the request cannot be parsed or response cannot be
/// constructed.
pub async fn fetch(&self, mut req: Request) -> Result<Response> {
pub async fn fetch(&self, req: Request) -> Result<Response> {
self.with_obs(async || self.fetch_impl(req).await).await
}

async fn fetch_impl(&self, mut req: Request) -> Result<Response> {
match req.path().as_str() {
ENTRY_ENDPOINT => {
let entry: PendingLogEntryBlob = deserialize(&req.bytes().await?)?;
Expand Down Expand Up @@ -163,6 +173,19 @@ impl GenericBatcher {
_ => Response::error("not found", 404),
}
}

async fn with_obs<F, R>(&self, f: F) -> R
where
F: AsyncFnOnce() -> R,
{
let r = f().await;
if let Some(wshim) = self.wshim.clone() {
self.state.wait_until(async move {
wshim.flush(&obs::logs::LOGGER).await;
});
}
r
}
}

impl GenericBatcher {
Expand Down
37 changes: 31 additions & 6 deletions crates/generic_log_worker/src/cleaner_do.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{cell::RefCell, mem, time::Duration};
use tlog_tiles::{PathElem, TlogTile};
use worker::{Bucket, Env, Error as WorkerError, Object, Request, Response, State, Storage};

use crate::{load_public_bucket, log_ops::CHECKPOINT_KEY, util::now_millis};
use crate::{load_public_bucket, log_ops::CHECKPOINT_KEY, obs, util::now_millis};

// Workers are limited to 1000 subrequests per invocation (including R2 operations).
// For each log, we'll need to perform the following subrequests:
Expand All @@ -36,13 +36,15 @@ pub struct CleanerConfig {
}

pub struct GenericCleaner {
state: State,
config: CleanerConfig,
storage: Storage,
bucket: Bucket,
cleaned_size: RefCell<u64>,
current_size: RefCell<u64>,
subrequests: RefCell<usize>,
initialized: RefCell<bool>,
wshim: Option<obs::Wshim>,
}

impl GenericCleaner {
Expand All @@ -51,16 +53,19 @@ impl GenericCleaner {
/// # Panics
///
/// Panics if we can't get a handle for the public bucket.
pub fn new(state: &State, env: &Env, config: CleanerConfig) -> Self {
pub fn new(state: State, env: &Env, config: CleanerConfig) -> Self {
let bucket = load_public_bucket(env, &config.name).unwrap();
let wshim = obs::Wshim::from_env(env).ok();
Self {
storage: state.storage(),
state,
config,
bucket,
cleaned_size: RefCell::new(0),
current_size: RefCell::new(0),
subrequests: RefCell::new(0),
initialized: RefCell::new(false),
wshim,
}
}

Expand Down Expand Up @@ -96,10 +101,13 @@ impl GenericCleaner {
/// # Errors
/// Will return an error if initialization fails.
pub async fn fetch(&self, _req: Request) -> Result<Response, WorkerError> {
if !*self.initialized.borrow() {
self.initialize().await?;
}
Response::ok("Started cleaner")
self.with_obs(async || {
if !*self.initialized.borrow() {
self.initialize().await?;
}
Response::ok("Started cleaner")
})
.await
}

/// Alarm handler for the partial tile cleaner. This runs in a loop
Expand All @@ -109,6 +117,10 @@ impl GenericCleaner {
/// # Errors
/// Will return an error if initialization or cleaning fails.
pub async fn alarm(&self) -> Result<Response, WorkerError> {
self.with_obs(async || self.alarm_impl().await).await
}

async fn alarm_impl(&self) -> Result<Response, WorkerError> {
// Reset the subrequest count.
*self.subrequests.borrow_mut() = 0;

Expand Down Expand Up @@ -293,4 +305,17 @@ impl GenericCleaner {
*self.subrequests.borrow_mut() += new;
Ok(())
}

async fn with_obs<F, R>(&self, f: F) -> R
where
F: AsyncFnOnce() -> R,
{
let r = f().await;
if let Some(wshim) = self.wshim.clone() {
self.state.wait_until(async move {
wshim.flush(&obs::logs::LOGGER).await;
});
}
r
}
}
18 changes: 4 additions & 14 deletions crates/generic_log_worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use base64::Engine;
pub mod batcher_do;
pub mod cleaner_do;
pub mod log_ops;
mod metrics;
pub mod obs;
pub mod sequencer_do;
pub mod util;

Expand All @@ -17,9 +17,9 @@ pub use log_ops::upload_issuers;
pub use sequencer_do::*;

use byteorder::{BigEndian, WriteBytesExt};
use log::{error, Level};
use log::error;
use log_ops::UploadOptions;
use metrics::{millis_diff_as_secs, AsF64, ObjectMetrics};
use obs::metrics::{millis_diff_as_secs, AsF64, ObjectMetrics};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
Expand All @@ -28,8 +28,6 @@ use std::cell::RefCell;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::io::Write;
use std::str::FromStr;
use std::sync::Once;
use tlog_tiles::{LookupKey, PendingLogEntry, SequenceMetadata};
use tokio::sync::Mutex;
use util::now_millis;
Expand All @@ -44,9 +42,6 @@ pub const CLEANER_BINDING: &str = "CLEANER";

const BATCH_ENDPOINT: &str = "/add_batch";
pub const ENTRY_ENDPOINT: &str = "/add_entry";
pub const METRICS_ENDPOINT: &str = "/metrics";

static INIT_LOGGING: Once = Once::new();

/// Initialize logging and panic handling for the Worker. This should be called
/// in the Worker's start event handler.
Expand All @@ -56,12 +51,7 @@ static INIT_LOGGING: Once = Once::new();
/// Panics if the logger has already been initialized, which should never happen
/// due to the use of `sync::Once`.
pub fn init_logging(level: Option<&str>) {
let level = level
.and_then(|level| Level::from_str(level).ok())
.unwrap_or(Level::Info);
INIT_LOGGING.call_once(|| {
console_log::init_with_level(level).expect("error initializing logger");
});
obs::logs::init(level);
}

/// Wrapper around `bitcode::serialize`.
Expand Down
2 changes: 1 addition & 1 deletion crates/generic_log_worker/src/log_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! - [testlog_test.go](https://github.com/FiloSottile/sunlight/blob/36be227ff4599ac11afe3cec37a5febcd61da16a/internal/ctlog/testlog_test.go)

use crate::{
metrics::{millis_diff_as_secs, AsF64, SequencerMetrics},
obs::metrics::{millis_diff_as_secs, AsF64, SequencerMetrics},
util::now_millis,
CacheRead, CacheWrite, LockBackend, LookupKey, ObjectBackend, SequenceMetadata,
SequencerConfig,
Expand Down
Loading