Skip to content

Commit d4370ab

Browse files
committed
Add wshim support
1 parent dd4b0e6 commit d4370ab

File tree

16 files changed

+338
-103
lines changed

16 files changed

+338
-103
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/ct_worker/src/batcher_do.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ impl DurableObject for Batcher {
2525
enable_dedup: params.enable_dedup,
2626
location_hint: params.location_hint.clone(),
2727
};
28-
Batcher(GenericBatcher::new(env, config))
28+
Batcher(GenericBatcher::new(state, env, config))
2929
}
3030

3131
async fn fetch(&self, req: Request) -> Result<Response> {

crates/ct_worker/src/cleaner_do.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl DurableObject for Cleaner {
3535
clean_interval: Duration::from_secs(params.clean_interval_secs),
3636
};
3737

38-
Cleaner(GenericCleaner::new(&state, &env, config))
38+
Cleaner(GenericCleaner::new(state, &env, config))
3939
}
4040

4141
async fn fetch(&self, req: Request) -> Result<Response> {

crates/ct_worker/src/frontend_worker.rs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use crate::{load_roots, load_signing_key, SequenceMetadata, CONFIG};
77
use config::TemporalInterval;
88
use generic_log_worker::{
99
batcher_id_from_lookup_key, deserialize, get_cached_metadata, get_durable_object_stub,
10-
init_logging, load_cache_kv, load_public_bucket, put_cache_entry_metadata, serialize,
11-
ObjectBucket, ENTRY_ENDPOINT, METRICS_ENDPOINT,
10+
init_logging, load_cache_kv, load_public_bucket, obs::Wshim, put_cache_entry_metadata,
11+
serialize, ObjectBucket, ENTRY_ENDPOINT,
1212
};
1313
use p256::pkcs8::EncodePublicKey;
1414
use serde::Serialize;
@@ -62,9 +62,10 @@ fn start() {
6262
///
6363
/// Panics if there are issues parsing route parameters, which should never happen.
6464
#[event(fetch, respond_with_errors)]
65-
async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
65+
async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
66+
let wshim = Wshim::from_env(&env);
6667
// Use an outer router as middleware to check that the log name is valid.
67-
Router::new()
68+
let response = Router::new()
6869
.or_else_any_method_async("/logs/:log/*route", |req, ctx| async move {
6970
let name = if let Some(name) = ctx.param("log") {
7071
if CONFIG.logs.contains_key(name) {
@@ -117,18 +118,6 @@ async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
117118
temporal_interval: &params.temporal_interval,
118119
})
119120
})
120-
.get_async("/logs/:log/metrics", |_req, ctx| async move {
121-
let name = ctx.data;
122-
let stub = get_durable_object_stub(
123-
&ctx.env,
124-
name,
125-
None,
126-
"SEQUENCER",
127-
CONFIG.logs[name].location_hint.as_deref(),
128-
)?;
129-
stub.fetch_with_str(&format!("http://fake_url.com{METRICS_ENDPOINT}"))
130-
.await
131-
})
132121
.get("/logs/:log/sequencer_id", |_req, ctx| {
133122
// Print out the Durable Object ID of the sequencer to allow
134123
// looking it up in internal Cloudflare dashboards. This
@@ -181,7 +170,11 @@ async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
181170
log::warn!("Internal error: {e}");
182171
Response::error("Internal error", 500)
183172
}
184-
})
173+
});
174+
if let Ok(wshim) = wshim {
175+
ctx.wait_until(async move { wshim.flush(&generic_log_worker::obs::logs::LOGGER).await });
176+
}
177+
response
185178
}
186179

187180
#[allow(clippy::too_many_lines)]

crates/generic_log_worker/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,14 @@ console_log.workspace = true
2929
ed25519-dalek.workspace = true
3030
futures-util.workspace = true
3131
hex.workspace = true
32-
log.workspace = true
32+
log = { workspace = true, features = ["kv"] }
3333
p256.workspace = true
3434
prometheus.workspace = true
3535
rand.workspace = true
3636
serde-wasm-bindgen.workspace = true
3737
serde.workspace = true
3838
serde_bytes.workspace = true
39+
serde_json.workspace = true
3940
sha2.workspace = true
4041
signed_note.workspace = true
4142
thiserror.workspace = true

crates/generic_log_worker/src/batcher_do.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
//! Entries are assigned to Batcher shards with consistent hashing on the cache key.
88
99
use crate::{
10-
deserialize, get_durable_object_stub, load_cache_kv, serialize, LookupKey, SequenceMetadata,
11-
BATCH_ENDPOINT, ENTRY_ENDPOINT, SEQUENCER_BINDING,
10+
deserialize, get_durable_object_stub, load_cache_kv, obs, serialize, LookupKey,
11+
SequenceMetadata, BATCH_ENDPOINT, ENTRY_ENDPOINT, SEQUENCER_BINDING,
1212
};
1313
use base64::prelude::*;
1414
use futures_util::future::{join_all, select, Either};
@@ -26,10 +26,12 @@ use worker::*;
2626
pub struct GenericBatcher {
2727
env: Env,
2828
config: BatcherConfig,
29+
state: State,
2930
kv: Option<KvStore>,
3031
batch: RefCell<Batch>,
3132
in_flight: RefCell<usize>,
3233
processed: RefCell<usize>,
34+
wshim: Option<obs::Wshim>,
3335
}
3436

3537
pub struct BatcherConfig {
@@ -64,20 +66,24 @@ impl GenericBatcher {
6466
/// # Panics
6567
///
6668
/// Panics if we can't get a handle for the sequencer or KV store.
67-
pub fn new(env: Env, config: BatcherConfig) -> Self {
69+
pub fn new(state: State, env: Env, config: BatcherConfig) -> Self {
6870
let kv = if config.enable_dedup {
6971
Some(load_cache_kv(&env, &config.name).unwrap())
7072
} else {
7173
None
7274
};
7375

76+
let wshim = obs::Wshim::from_env(&env).ok();
77+
7478
Self {
7579
env,
7680
config,
81+
state,
7782
kv,
7883
batch: RefCell::new(Batch::default()),
7984
in_flight: RefCell::new(0),
8085
processed: RefCell::new(0),
86+
wshim,
8187
}
8288
}
8389

@@ -89,7 +95,11 @@ impl GenericBatcher {
8995
///
9096
/// Returns an error if the request cannot be parsed or response cannot be
9197
/// constructed.
92-
pub async fn fetch(&self, mut req: Request) -> Result<Response> {
98+
pub async fn fetch(&self, req: Request) -> Result<Response> {
99+
self.with_obs(async || self.fetch_impl(req).await).await
100+
}
101+
102+
async fn fetch_impl(&self, mut req: Request) -> Result<Response> {
93103
match req.path().as_str() {
94104
ENTRY_ENDPOINT => {
95105
let entry: PendingLogEntryBlob = deserialize(&req.bytes().await?)?;
@@ -163,6 +173,19 @@ impl GenericBatcher {
163173
_ => Response::error("not found", 404),
164174
}
165175
}
176+
177+
async fn with_obs<F, R>(&self, f: F) -> R
178+
where
179+
F: AsyncFnOnce() -> R,
180+
{
181+
let r = f().await;
182+
if let Some(wshim) = self.wshim.clone() {
183+
self.state.wait_until(async move {
184+
wshim.flush(&obs::logs::LOGGER).await;
185+
});
186+
}
187+
r
188+
}
166189
}
167190

168191
impl GenericBatcher {

crates/generic_log_worker/src/cleaner_do.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::{cell::RefCell, mem, time::Duration};
99
use tlog_tiles::{PathElem, TlogTile};
1010
use worker::{Bucket, Env, Error as WorkerError, Object, Request, Response, State, Storage};
1111

12-
use crate::{load_public_bucket, log_ops::CHECKPOINT_KEY, util::now_millis};
12+
use crate::{load_public_bucket, log_ops::CHECKPOINT_KEY, obs, util::now_millis};
1313

1414
// Workers are limited to 1000 subrequests per invocation (including R2 operations).
1515
// For each log, we'll need to perform the following subrequests:
@@ -36,13 +36,15 @@ pub struct CleanerConfig {
3636
}
3737

3838
pub struct GenericCleaner {
39+
state: State,
3940
config: CleanerConfig,
4041
storage: Storage,
4142
bucket: Bucket,
4243
cleaned_size: RefCell<u64>,
4344
current_size: RefCell<u64>,
4445
subrequests: RefCell<usize>,
4546
initialized: RefCell<bool>,
47+
wshim: Option<obs::Wshim>,
4648
}
4749

4850
impl GenericCleaner {
@@ -51,16 +53,19 @@ impl GenericCleaner {
5153
/// # Panics
5254
///
5355
/// Panics if we can't get a handle for the public bucket.
54-
pub fn new(state: &State, env: &Env, config: CleanerConfig) -> Self {
56+
pub fn new(state: State, env: &Env, config: CleanerConfig) -> Self {
5557
let bucket = load_public_bucket(env, &config.name).unwrap();
58+
let wshim = obs::Wshim::from_env(env).ok();
5659
Self {
5760
storage: state.storage(),
61+
state,
5862
config,
5963
bucket,
6064
cleaned_size: RefCell::new(0),
6165
current_size: RefCell::new(0),
6266
subrequests: RefCell::new(0),
6367
initialized: RefCell::new(false),
68+
wshim,
6469
}
6570
}
6671

@@ -96,10 +101,13 @@ impl GenericCleaner {
96101
/// # Errors
97102
/// Will return an error if initialization fails.
98103
pub async fn fetch(&self, _req: Request) -> Result<Response, WorkerError> {
99-
if !*self.initialized.borrow() {
100-
self.initialize().await?;
101-
}
102-
Response::ok("Started cleaner")
104+
self.with_obs(async || {
105+
if !*self.initialized.borrow() {
106+
self.initialize().await?;
107+
}
108+
Response::ok("Started cleaner")
109+
})
110+
.await
103111
}
104112

105113
/// Alarm handler for the partial tile cleaner. This runs in a loop
@@ -109,6 +117,10 @@ impl GenericCleaner {
109117
/// # Errors
110118
/// Will return an error if initialization or cleaning fails.
111119
pub async fn alarm(&self) -> Result<Response, WorkerError> {
120+
self.with_obs(async || self.alarm_impl().await).await
121+
}
122+
123+
async fn alarm_impl(&self) -> Result<Response, WorkerError> {
112124
// Reset the subrequest count.
113125
*self.subrequests.borrow_mut() = 0;
114126

@@ -293,4 +305,17 @@ impl GenericCleaner {
293305
*self.subrequests.borrow_mut() += new;
294306
Ok(())
295307
}
308+
309+
async fn with_obs<F, R>(&self, f: F) -> R
310+
where
311+
F: AsyncFnOnce() -> R,
312+
{
313+
let r = f().await;
314+
if let Some(wshim) = self.wshim.clone() {
315+
self.state.wait_until(async move {
316+
wshim.flush(&obs::logs::LOGGER).await;
317+
});
318+
}
319+
r
320+
}
296321
}

crates/generic_log_worker/src/lib.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use base64::Engine;
77
pub mod batcher_do;
88
pub mod cleaner_do;
99
pub mod log_ops;
10-
mod metrics;
10+
pub mod obs;
1111
pub mod sequencer_do;
1212
pub mod util;
1313

@@ -17,9 +17,9 @@ pub use log_ops::upload_issuers;
1717
pub use sequencer_do::*;
1818

1919
use byteorder::{BigEndian, WriteBytesExt};
20-
use log::{error, Level};
20+
use log::error;
2121
use log_ops::UploadOptions;
22-
use metrics::{millis_diff_as_secs, AsF64, ObjectMetrics};
22+
use obs::metrics::{millis_diff_as_secs, AsF64, ObjectMetrics};
2323
use serde::de::DeserializeOwned;
2424
use serde::{Deserialize, Serialize};
2525
use serde_bytes::ByteBuf;
@@ -28,8 +28,6 @@ use std::cell::RefCell;
2828
use std::collections::btree_map::Entry;
2929
use std::collections::{BTreeMap, HashMap, VecDeque};
3030
use std::io::Write;
31-
use std::str::FromStr;
32-
use std::sync::Once;
3331
use tlog_tiles::{LookupKey, PendingLogEntry, SequenceMetadata};
3432
use tokio::sync::Mutex;
3533
use util::now_millis;
@@ -44,9 +42,6 @@ pub const CLEANER_BINDING: &str = "CLEANER";
4442

4543
const BATCH_ENDPOINT: &str = "/add_batch";
4644
pub const ENTRY_ENDPOINT: &str = "/add_entry";
47-
pub const METRICS_ENDPOINT: &str = "/metrics";
48-
49-
static INIT_LOGGING: Once = Once::new();
5045

5146
/// Initialize logging and panic handling for the Worker. This should be called
5247
/// in the Worker's start event handler.
@@ -56,12 +51,7 @@ static INIT_LOGGING: Once = Once::new();
5651
/// Panics if the logger has already been initialized, which should never happen
5752
/// due to the use of `sync::Once`.
5853
pub fn init_logging(level: Option<&str>) {
59-
let level = level
60-
.and_then(|level| Level::from_str(level).ok())
61-
.unwrap_or(Level::Info);
62-
INIT_LOGGING.call_once(|| {
63-
console_log::init_with_level(level).expect("error initializing logger");
64-
});
54+
obs::logs::init(level);
6555
}
6656

6757
/// Wrapper around `bitcode::serialize`.

crates/generic_log_worker/src/log_ops.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
//! - [testlog_test.go](https://github.com/FiloSottile/sunlight/blob/36be227ff4599ac11afe3cec37a5febcd61da16a/internal/ctlog/testlog_test.go)
2020
2121
use crate::{
22-
metrics::{millis_diff_as_secs, AsF64, SequencerMetrics},
22+
obs::metrics::{millis_diff_as_secs, AsF64, SequencerMetrics},
2323
util::now_millis,
2424
CacheRead, CacheWrite, LockBackend, LookupKey, ObjectBackend, SequenceMetadata,
2525
SequencerConfig,

0 commit comments

Comments
 (0)