Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 101 additions & 90 deletions crates/ct_worker/src/frontend_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use std::sync::LazyLock;

use crate::{load_signing_key, load_witness_key, LookupKey, SequenceMetadata, CONFIG, ROOTS};
use crate::{load_signing_key, LookupKey, SequenceMetadata, CONFIG, ROOTS};
use config::TemporalInterval;
use futures_util::future::try_join_all;
use generic_log_worker::{
Expand All @@ -24,17 +24,18 @@ use tlog_tiles::{LogEntry, PendingLogEntry};
use worker::*;

// The Maximum Merge Delay (MMD) of a log indicates the maximum period of time
// between when a SCT is issued and the corresponding entry is sequenced
// in the log. For static CT logs, this is effectively zero since SCT issuance
// happens only once the entry is sequenced. However, we can leave this value
// in the metadata as the default (1 day).
const MAX_MERGE_DELAY: usize = 86_400;
// between when a SCT is issued and the corresponding entry is sequenced in the
// log. For Azul-based logs, this is effectively zero since SCT issuance happens
// only once the entry is sequenced. However, we can leave this value as the
// maximum allowed in Chrome's policy, 60 seconds, to allow future flexibility.
// For details, see https://github.com/C2SP/C2SP/issues/79.
const MAX_MERGE_DELAY_SECS: usize = 60;

const UNKNOWN_LOG_MSG: &str = "unknown log";

#[serde_as]
#[derive(Serialize)]
struct MetadataResponse<'a> {
struct LogV3JsonResponse<'a> {
#[serde(skip_serializing_if = "Option::is_none")]
description: &'a Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -43,8 +44,6 @@ struct MetadataResponse<'a> {
log_id: &'a [u8],
#[serde_as(as = "Base64")]
key: &'a [u8],
#[serde_as(as = "Base64")]
witness_key: &'a [u8],
mmd: usize,
submission_url: &'a str,
monitoring_url: &'a str,
Expand All @@ -68,77 +67,101 @@ fn start() {
/// Panics if there are issues parsing route parameters, which should never happen.
#[event(fetch, respond_with_errors)]
async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
let router = Router::new();
router
.get("/logs/:log/ct/v1/get-roots", |_req, ctx| {
let _name = valid_log_name(&ctx)?;
Response::from_json(&GetRootsResponse {
certificates: x509_util::certs_to_bytes(&ROOTS.certs).unwrap(),
})
})
.post_async("/logs/:log/ct/v1/add-chain", |req, ctx| async move {
add_chain_or_pre_chain(req, &ctx.env, valid_log_name(&ctx)?, false).await
})
.post_async("/logs/:log/ct/v1/add-pre-chain", |req, ctx| async move {
add_chain_or_pre_chain(req, &ctx.env, valid_log_name(&ctx)?, true).await
})
.get("/logs/:log/metadata", |_req, ctx| {
let name = valid_log_name(&ctx)?;
let params = &CONFIG.logs[name];
let verifying_key = load_signing_key(&ctx.env, name)?.verifying_key();
let log_id =
&static_ct_api::log_id_from_key(verifying_key).map_err(|e| e.to_string())?;
let key = verifying_key
.to_public_key_der()
.map_err(|e| e.to_string())?;
let witness_key = load_witness_key(&ctx.env, name)?;
let witness_key = witness_key
.verifying_key()
.to_public_key_der()
.map_err(|e| e.to_string())?;
Response::from_json(&MetadataResponse {
description: &params.description,
log_type: &params.log_type,
log_id,
key: key.as_bytes(),
witness_key: witness_key.as_bytes(),
submission_url: &params.submission_url,
monitoring_url: if params.monitoring_url.is_empty() {
&params.submission_url
// Use an outer router as middleware to check that the log name is valid.
Router::new()
.or_else_any_method_async("/logs/:log/*route", |req, ctx| async move {
let name = if let Some(name) = ctx.param("log") {
if CONFIG.logs.contains_key(name) {
&name.clone()
} else {
&params.monitoring_url
},
mmd: MAX_MERGE_DELAY,
temporal_interval: &params.temporal_interval,
})
})
.get_async("/logs/:log/metrics", |_req, ctx| async move {
let name = valid_log_name(&ctx)?;
let stub = get_durable_object_stub(
&ctx.env,
name,
None,
"SEQUENCER",
CONFIG.logs[name].location_hint.as_deref(),
)?;
stub.fetch_with_str(&format!("http://fake_url.com{METRICS_ENDPOINT}"))
.await
})
.get_async("/logs/:log/*key", |_req, ctx| async move {
let name = valid_log_name(&ctx)?;
let key = ctx.param("key").unwrap();

let bucket = load_public_bucket(&ctx.env, name)?;
if let Some(obj) = bucket.get(key).execute().await? {
Response::from_body(
obj.body()
.ok_or("R2 object missing body")?
.response_body()?,
)
.map(|r| r.with_headers(headers_from_http_metadata(obj.http_metadata())))
return Err(UNKNOWN_LOG_MSG.into());
}
} else {
Response::error("Not found", 404)
}
return Err("missing 'log' route param".into());
};

// Now that we've validated the log name, use an inner router to
// handle the request.
Router::with_data(name)
.get("/logs/:log/ct/v1/get-roots", |_req, _ctx| {
Response::from_json(&GetRootsResponse {
certificates: x509_util::certs_to_bytes(&ROOTS.certs).unwrap(),
})
})
.post_async("/logs/:log/ct/v1/add-chain", |req, ctx| async move {
add_chain_or_pre_chain(req, &ctx.env, ctx.data, false).await
})
.post_async("/logs/:log/ct/v1/add-pre-chain", |req, ctx| async move {
add_chain_or_pre_chain(req, &ctx.env, ctx.data, true).await
})
.get("/logs/:log/log.v3.json", |_req, ctx| {
let name = ctx.data;
let params = &CONFIG.logs[name];
let verifying_key = load_signing_key(&ctx.env, name)?.verifying_key();
let log_id = &static_ct_api::log_id_from_key(verifying_key)
.map_err(|e| e.to_string())?;
let key = verifying_key
.to_public_key_der()
.map_err(|e| e.to_string())?;
Response::from_json(&LogV3JsonResponse {
description: &params.description,
log_type: &params.log_type,
log_id,
key: key.as_bytes(),
submission_url: &params.submission_url,
monitoring_url: if params.monitoring_url.is_empty() {
&params.submission_url
} else {
&params.monitoring_url
},
mmd: MAX_MERGE_DELAY_SECS,
temporal_interval: &params.temporal_interval,
})
})
.get_async("/logs/:log/metrics", |_req, ctx| async move {
let name = ctx.data;
let stub = get_durable_object_stub(
&ctx.env,
name,
None,
"SEQUENCER",
CONFIG.logs[name].location_hint.as_deref(),
)?;
stub.fetch_with_str(&format!("http://fake_url.com{METRICS_ENDPOINT}"))
.await
})
.get_async("/logs/:log/*key", |_req, ctx| async move {
let name = ctx.data;
let key = ctx.param("key").unwrap();

// Enable direct access to the bucket via the Worker if
// monitoring_url is unspecified.
if CONFIG.logs[name].monitoring_url.is_empty() {
let bucket = load_public_bucket(&ctx.env, name)?;
if let Some(obj) = bucket.get(key).execute().await? {
Response::from_body(
obj.body()
.ok_or("R2 object missing body")?
.response_body()?,
)
.map(|r| {
r.with_headers(headers_from_http_metadata(obj.http_metadata()))
})
} else {
Response::error("Not found", 404)
}
} else {
Response::error(
format!(
"Use {} for monitoring API",
CONFIG.logs[name].monitoring_url
),
403,
)
}
})
.run(req, ctx.env)
.await
})
.run(req, env)
.await
Expand Down Expand Up @@ -269,18 +292,6 @@ async fn add_chain_or_pre_chain(
Response::from_json(&sct)
}

fn valid_log_name(ctx: &RouteContext<()>) -> Result<&str> {
if let Some(name) = ctx.param("log") {
if CONFIG.logs.contains_key(name) {
Ok(name)
} else {
Err(UNKNOWN_LOG_MSG.into())
}
} else {
Err("missing 'log' route param".into())
}
}

fn batcher_id_from_lookup_key(key: &LookupKey, num_batchers: u8) -> u8 {
key[0] % num_batchers
}
Expand Down