Skip to content

Commit 661a36a

Browse files
author
Michael Rosenberg
committed
MTC: Implement caching for hot and cold calls to /get-landmark-bundle
* Started moving landmark bundle comptuation to cronjob * Fix type signature of CheckpointCallbacker to let the landmark bundle creator access the checkpoint bytes * Save the landmark bundle in R2 * Make landmark subtrees endpoint fetch from R2 * Return get-landmark-bundle directly from R2 * Implement cached bucket backend
1 parent 95a8e2b commit 661a36a

File tree

6 files changed

+161
-72
lines changed

6 files changed

+161
-72
lines changed

crates/generic_log_worker/src/lib.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ use serde::{Deserialize, Serialize};
2525
use serde_bytes::ByteBuf;
2626
use sha2::{Digest, Sha256};
2727
use std::cell::RefCell;
28-
use std::collections::{HashMap, VecDeque};
28+
use std::collections::btree_map::Entry;
29+
use std::collections::{BTreeMap, HashMap, VecDeque};
2930
use std::io::Write;
3031
use std::str::FromStr;
3132
use std::sync::Once;
3233
use tlog_tiles::{LookupKey, PendingLogEntry, SequenceMetadata};
34+
use tokio::sync::Mutex;
3335
use util::now_millis;
3436
use worker::{
3537
js_sys, kv, kv::KvStore, wasm_bindgen, Bucket, Env, Error, HttpMetadata, Result, State,
@@ -628,3 +630,43 @@ impl ObjectBackend for ObjectBucket {
628630
res
629631
}
630632
}
633+
634+
/// A read-only ObjectBucket that caches every fetch no matter how big
635+
pub struct CachedRoObjectBucket {
636+
bucket: ObjectBucket,
637+
cache: Mutex<BTreeMap<String, Option<Vec<u8>>>>,
638+
}
639+
640+
impl CachedRoObjectBucket {
641+
pub fn new(bucket: ObjectBucket) -> Self {
642+
CachedRoObjectBucket {
643+
bucket,
644+
cache: Mutex::new(BTreeMap::new()),
645+
}
646+
}
647+
}
648+
649+
impl ObjectBackend for CachedRoObjectBucket {
650+
async fn upload<S: AsRef<str>, D: Into<Vec<u8>>>(
651+
&self,
652+
_key: S,
653+
_data: D,
654+
_opts: &UploadOptions,
655+
) -> Result<()> {
656+
unimplemented!("CachedRoObjectBucket does not implement ObjectBackend::upload")
657+
}
658+
659+
async fn fetch<S: AsRef<str>>(&self, key: S) -> Result<Option<Vec<u8>>> {
660+
// See if the key is in the cache
661+
match self.cache.blocking_lock().entry(key.as_ref().to_string()) {
662+
// If so, return the value
663+
Entry::Occupied(oentry) => Ok(oentry.get().clone()),
664+
// Otherwise, fetch the value, cache it, and return it
665+
Entry::Vacant(ventry) => {
666+
let val = self.bucket.fetch(key).await?;
667+
ventry.insert(val.clone());
668+
Ok(val)
669+
}
670+
}
671+
}
672+
}

crates/generic_log_worker/src/log_ops.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1061,7 +1061,7 @@ async fn sequence_entries<L: LogEntry>(
10611061

10621062
// Call the checkpoint callback. This is a no-op for CT, but is used to
10631063
// update landmark checkpoints for MTC.
1064-
if let Err(e) = (config.checkpoint_callback)(n, old_time, timestamp).await {
1064+
if let Err(e) = (config.checkpoint_callback)(old_time, timestamp, new.checkpoint()).await {
10651065
warn!("{name}: Checkpoint callback failed: {e}");
10661066
}
10671067

crates/generic_log_worker/src/sequencer_do.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -307,14 +307,14 @@ impl<L: LogEntry> GenericSequencer<L> {
307307
/// so that it can have side-effects.
308308
///
309309
/// The parameters are as follows:
310-
/// - `tree_size: u64`: The tree size of the latest checkpoint.
311310
/// - `old_time: UnixTimestamp`: The timestamp of the previous checkpoint.
312311
/// - `new_time: UnixTimestamp`: The timestamp of the latest checkpoint.
312+
/// - `new_checkpoint: &[u8]`: The latest checkpoint bytes. This is a signed note.
313313
pub type CheckpointCallbacker = Box<
314314
dyn Fn(
315-
u64,
316315
UnixTimestamp,
317316
UnixTimestamp,
317+
&[u8],
318318
) -> Pin<Box<dyn Future<Output = Result<(), WorkerError>> + 'static>>
319319
+ 'static,
320320
>;
@@ -323,9 +323,8 @@ pub type CheckpointCallbacker = Box<
323323
/// don't need to perform any action after the checkpoint is updated.
324324
pub fn empty_checkpoint_callback() -> CheckpointCallbacker {
325325
Box::new(
326-
move |_tree_size: u64, _old_time: UnixTimestamp, _new_time: UnixTimestamp| {
326+
move |_old_time: UnixTimestamp, _new_time: UnixTimestamp, _new_checkpoint: &[u8]| {
327327
Box::pin(async move { Ok(()) })
328-
as Pin<Box<dyn Future<Output = Result<(), WorkerError>>>>
329328
},
330329
)
331330
}

crates/mtc_api/src/landmark.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ pub struct LandmarkSequence {
99
pub landmarks: VecDeque<u64>,
1010
}
1111

12-
/// The location in object storage for the landmark bundle.
13-
pub static LANDMARK_KEY: &str = "landmark";
12+
/// The location in object storage for the landmark sequence
13+
pub const LANDMARK_KEY: &str = "landmark";
14+
15+
/// The location in object storage for the landmark bundle. Its serialized form is JSON
16+
pub const LANDMARK_BUNDLE_KEY: &str = "landmark-bundle";
1417

1518
impl LandmarkSequence {
1619
/// Create a new landmark sequence with the given `max_landmarks` and an

crates/mtc_worker/src/frontend_worker.rs

Lines changed: 10 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,19 @@ use der::{
1111
use generic_log_worker::{
1212
batcher_id_from_lookup_key, deserialize, get_durable_object_stub, init_logging,
1313
load_public_bucket,
14-
log_ops::{
15-
prove_subtree_consistency, prove_subtree_inclusion, read_leaf, ProofError, CHECKPOINT_KEY,
16-
},
14+
log_ops::{prove_subtree_inclusion, read_leaf, ProofError, CHECKPOINT_KEY},
1715
serialize,
1816
util::now_millis,
1917
ObjectBackend, ObjectBucket, ENTRY_ENDPOINT, METRICS_ENDPOINT,
2018
};
2119
use mtc_api::{
2220
serialize_signatureless_cert, AddEntryRequest, AddEntryResponse, BootstrapMtcLogEntry,
23-
GetRootsResponse, LandmarkSequence, ID_RDNA_TRUSTANCHOR_ID, LANDMARK_KEY,
21+
GetRootsResponse, LandmarkSequence, ID_RDNA_TRUSTANCHOR_ID, LANDMARK_BUNDLE_KEY, LANDMARK_KEY,
2422
};
2523
use serde::{Deserialize, Serialize};
2624
use serde_with::{base64::Base64, serde_as};
2725
use signed_note::VerifierList;
28-
use std::{collections::VecDeque, time::Duration};
26+
use std::time::Duration;
2927
use tlog_tiles::{
3028
open_checkpoint, CheckpointSigner, CheckpointText, LeafIndex, PendingLogEntry,
3129
PendingLogEntryBlob,
@@ -72,24 +70,6 @@ pub struct GetCertificateResponse {
7270
pub landmark_id: usize,
7371
}
7472

75-
/// GET response structure for the `/get-landmark-bundle` endpoint
76-
#[serde_as]
77-
#[derive(Serialize)]
78-
pub struct GetLandmarkBundleResponse<'a> {
79-
pub checkpoint: &'a str,
80-
pub subtrees: Vec<SubtreeWithConsistencyProof>,
81-
pub landmarks: &'a VecDeque<u64>,
82-
}
83-
84-
#[serde_as]
85-
#[derive(Serialize, Deserialize)]
86-
pub struct SubtreeWithConsistencyProof {
87-
#[serde_as(as = "Base64")]
88-
pub hash: [u8; 32],
89-
#[serde_as(as = "Vec<Base64>")]
90-
pub consistency_proof: Vec<[u8; 32]>,
91-
}
92-
9373
/// Start is the first code run when the Wasm module is loaded.
9474
#[event(start)]
9575
fn start() {
@@ -412,41 +392,14 @@ async fn add_entry(mut req: Request, env: &Env, name: &str) -> Result<Response>
412392
async fn get_landmark_bundle(env: &Env, name: &str) -> Result<Response> {
413393
let object_backend = ObjectBucket::new(load_public_bucket(env, name)?);
414394

415-
// Fetch the current checkpoint.
416-
let (checkpoint, checkpoint_bytes) = get_current_checkpoint(env, name, &object_backend).await?;
417-
418-
// Fetch the current landmark sequence.
419-
let landmark_sequence = get_landmark_sequence(name, &object_backend).await?;
420-
421-
// Compute the sequence of landmark subtrees and, for each subtree, a proof of consistency with
422-
// the checkpoint. Each signatureless MTC includes an inclusion proof in one of these subtrees.
423-
let mut subtrees = Vec::new();
424-
for landmark_subtree in landmark_sequence.subtrees() {
425-
let (consistency_proof, landmark_subtree_hash) = match prove_subtree_consistency(
426-
*checkpoint.hash(),
427-
checkpoint.size(),
428-
landmark_subtree.lo(),
429-
landmark_subtree.hi(),
430-
&object_backend,
431-
)
432-
.await
433-
{
434-
Ok(p) => p,
435-
Err(ProofError::Tlog(s)) => return Response::error(s.to_string(), 422),
436-
Err(ProofError::Other(e)) => return Err(e.to_string().into()),
437-
};
438-
439-
subtrees.push(SubtreeWithConsistencyProof {
440-
hash: landmark_subtree_hash.0,
441-
consistency_proof: consistency_proof.iter().map(|h| h.0).collect(),
442-
});
443-
}
395+
// Fetch the current landmark bundle from R2 (already encoded in JSON) and return it
396+
let Some(landmark_bundle_bytes) = object_backend.fetch(LANDMARK_BUNDLE_KEY).await? else {
397+
return Err("failed to get landmark bundle".into());
398+
};
444399

445-
Response::from_json(&GetLandmarkBundleResponse {
446-
checkpoint: std::str::from_utf8(&checkpoint_bytes).unwrap(),
447-
subtrees,
448-
landmarks: &landmark_sequence.landmarks,
449-
})
400+
Ok(ResponseBuilder::new()
401+
.with_header("content-type", "application/json")?
402+
.body(ResponseBody::Body(landmark_bundle_bytes)))
450403
}
451404

452405
async fn get_current_checkpoint(

crates/mtc_worker/src/sequencer_do.rs

Lines changed: 99 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,20 @@
33

44
//! Sequencer is the 'brain' of the CT log, responsible for sequencing entries and maintaining log state.
55
6-
use std::{future::Future, pin::Pin, time::Duration};
6+
use std::{collections::VecDeque, time::Duration};
77

88
use crate::{load_checkpoint_cosigner, load_origin, CONFIG};
99
use generic_log_worker::{
10-
get_durable_object_name, load_public_bucket, CheckpointCallbacker, GenericSequencer,
11-
SequencerConfig, SEQUENCER_BINDING,
10+
get_durable_object_name, load_public_bucket,
11+
log_ops::{prove_subtree_consistency, ProofError},
12+
CachedRoObjectBucket, CheckpointCallbacker, GenericSequencer, ObjectBucket, SequencerConfig,
13+
SEQUENCER_BINDING,
1214
};
13-
use mtc_api::{BootstrapMtcLogEntry, LandmarkSequence, LANDMARK_KEY};
14-
use tlog_tiles::UnixTimestamp;
15+
use mtc_api::{BootstrapMtcLogEntry, LandmarkSequence, LANDMARK_BUNDLE_KEY, LANDMARK_KEY};
16+
use serde::{Deserialize, Serialize};
17+
use serde_with::{base64::Base64, serde_as};
18+
use signed_note::Note;
19+
use tlog_tiles::{CheckpointText, Hash, UnixTimestamp};
1520
#[allow(clippy::wildcard_imports)]
1621
use worker::*;
1722

@@ -53,14 +58,47 @@ impl DurableObject for Sequencer {
5358
}
5459
}
5560

61+
#[serde_as]
62+
#[derive(Serialize, Deserialize)]
63+
pub struct SubtreeWithConsistencyProof {
64+
#[serde_as(as = "Base64")]
65+
pub hash: [u8; 32],
66+
#[serde_as(as = "Vec<Base64>")]
67+
pub consistency_proof: Vec<[u8; 32]>,
68+
}
69+
70+
/// GET response structure for the `/get-landmark-bundle` endpoint
71+
#[derive(Serialize, Deserialize)]
72+
pub struct LandmarkBundle {
73+
pub checkpoint: String,
74+
pub subtrees: Vec<SubtreeWithConsistencyProof>,
75+
pub landmarks: VecDeque<u64>,
76+
}
77+
5678
/// Return a callback function that gets passed into the generic sequencer and
5779
/// called each time a new checkpoint is created. For MTC, this is used to
5880
/// periodically update the landmark checkpoint sequence.
5981
fn checkpoint_callback(env: &Env, name: &str) -> CheckpointCallbacker {
6082
let params = &CONFIG.logs[name];
6183
let bucket = load_public_bucket(env, name).unwrap();
6284
Box::new(
63-
move |tree_size: u64, old_time: UnixTimestamp, new_time: UnixTimestamp| {
85+
move |old_time: UnixTimestamp, new_time: UnixTimestamp, new_checkpoint_bytes: &[u8]| {
86+
let new_checkpoint = {
87+
// TODO: Make more efficient. There are two unnecessary allocations here.
88+
89+
// We can unwrap because the checkpoint provided is the checkpoint that the
90+
// sequencer just created, so it must be well formed.
91+
let note = Note::from_bytes(new_checkpoint_bytes)
92+
.expect("freshly created checkpoint is not a note");
93+
CheckpointText::from_bytes(note.text())
94+
.expect("freshly created checkpoint is not a checkpoint")
95+
};
96+
let tree_size = new_checkpoint.size();
97+
let root_hash = *new_checkpoint.hash();
98+
// We can unwrap here for the same reason as above
99+
let new_checkpoint_str = String::from_utf8(new_checkpoint_bytes.to_vec())
100+
.expect("freshly created checkpoint is not UTF-8");
101+
64102
Box::pin({
65103
// We have to clone each time since the bucket gets moved into
66104
// the async function.
@@ -100,9 +138,63 @@ fn checkpoint_callback(env: &Env, name: &str) -> CheckpointCallbacker {
100138
.execute()
101139
.await?;
102140
}
141+
142+
// Compute the landmark bundle and save it
143+
let subtrees =
144+
get_landmark_subtrees(&seq, root_hash, tree_size, bucket_clone.clone())
145+
.await?;
146+
let bundle = LandmarkBundle {
147+
checkpoint: new_checkpoint_str,
148+
subtrees,
149+
landmarks: seq.landmarks,
150+
};
151+
// TODO: the put operation here should be done with the put operation above.
152+
// Otherwise an error here might put us in a state where the landmark bundle is
153+
// out of sync with the landmark sequence. We need an all-or-nothing multi-put
154+
// operation.
155+
bucket_clone
156+
// Can unwrap here because we use the autoderived Serialize impl for LandmarkBundle
157+
.put(LANDMARK_BUNDLE_KEY, serde_json::to_vec(&bundle).unwrap())
158+
.execute()
159+
.await?;
160+
103161
Ok(())
104162
}
105-
}) as Pin<Box<dyn Future<Output = Result<()>>>>
163+
})
106164
},
107165
)
108166
}
167+
168+
// Computes the sequence of landmark subtrees and, for each subtree, a proof of consistency with the
169+
// checkpoint. Each signatureless MTC includes an inclusion proof in one of these subtrees.
170+
async fn get_landmark_subtrees(
171+
landmark_sequence: &LandmarkSequence,
172+
checkpoint_hash: Hash,
173+
checkpoint_size: u64,
174+
bucket: Bucket,
175+
) -> Result<Vec<SubtreeWithConsistencyProof>> {
176+
let cached_object_backend = CachedRoObjectBucket::new(ObjectBucket::new(bucket));
177+
let mut subtrees = Vec::new();
178+
for landmark_subtree in landmark_sequence.subtrees() {
179+
let (consistency_proof, landmark_subtree_hash) = match prove_subtree_consistency(
180+
checkpoint_hash,
181+
checkpoint_size,
182+
landmark_subtree.lo(),
183+
landmark_subtree.hi(),
184+
&cached_object_backend,
185+
)
186+
.await
187+
{
188+
Ok(p) => p,
189+
Err(ProofError::Tlog(s)) => return Err(s.to_string().into()),
190+
Err(ProofError::Other(e)) => return Err(e.to_string().into()),
191+
};
192+
193+
subtrees.push(SubtreeWithConsistencyProof {
194+
hash: landmark_subtree_hash.0,
195+
consistency_proof: consistency_proof.iter().map(|h| h.0).collect(),
196+
});
197+
}
198+
199+
Ok(subtrees)
200+
}

0 commit comments

Comments
 (0)