Skip to content

Commit 21b429f

Browse files
authored
Use bitcode instead of json serde, and remove generics from batcher (#68)
- Use bitcode instead of json for more efficient serialization for communication across Workers and DOs. This is an intermediate step short of full RPC support (#3). - Use bitcode instead of json for serializing the staging bundle. - Introduce `PendingLogEntryBlob` struct that wraps the serialized bytes and lookup key of a `PendingLogEntry` so that the Batcher doesn't need to fully deserialize entries as all it needs is the lookup key. This allows us to remote the generic type from the Batcher altogether (fixes #67). Other: - Pass argument to durable_object macro to reduce generated code size (Batchers only need fetch methods, and Sequencers need fetch and alarm, but not websocket.) - Clean up code in various places.
1 parent 0be6f6f commit 21b429f

File tree

19 files changed

+188
-114
lines changed

19 files changed

+188
-114
lines changed

Cargo.lock

Lines changed: 43 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ debug = 1
3232
[workspace.dependencies]
3333
anyhow = "1.0"
3434
base64 = "0.22"
35+
bitcode = { version = "0.6.6", features = ["serde"] }
3536
byteorder = "1.5"
3637
chrono = { version = "0.4", features = ["serde"] }
3738
console_error_panic_hook = "0.1.1"

crates/ct_worker/build.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
use chrono::Months;
77
use config::AppConfig;
8-
use serde_json::from_str;
98
use std::env;
109
use std::fs;
1110
use url::Url;
@@ -19,10 +18,10 @@ fn main() {
1918
});
2019

2120
// Validate the config json against the schema.
22-
let json = from_str(config_contents).unwrap_or_else(|e| {
21+
let json = serde_json::from_str(config_contents).unwrap_or_else(|e| {
2322
panic!("failed to deserialize JSON config '{config_file}': {e}");
2423
});
25-
let schema = from_str(include_str!("config.schema.json")).unwrap_or_else(|e| {
24+
let schema = serde_json::from_str(include_str!("config.schema.json")).unwrap_or_else(|e| {
2625
panic!("failed to deserialize JSON schema 'config.schema.json': {e}");
2726
});
2827
jsonschema::validate(&schema, &json).unwrap_or_else(|e| {

crates/ct_worker/src/batcher_do.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use crate::CONFIG;
22
use generic_log_worker::{get_durable_object_stub, load_cache_kv, BatcherConfig, GenericBatcher};
3-
use static_ct_api::StaticCTPendingLogEntry;
43
#[allow(clippy::wildcard_imports)]
54
use worker::*;
65

7-
#[durable_object]
8-
struct Batcher(GenericBatcher<StaticCTPendingLogEntry>);
6+
#[durable_object(fetch)]
7+
struct Batcher(GenericBatcher);
98

109
impl DurableObject for Batcher {
1110
fn new(state: State, env: Env) -> Self {

crates/ct_worker/src/frontend_worker.rs

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
66
use std::sync::LazyLock;
77

8-
use crate::{load_signing_key, LookupKey, SequenceMetadata, CONFIG, ROOTS};
8+
use crate::{load_signing_key, SequenceMetadata, CONFIG, ROOTS};
99
use config::TemporalInterval;
1010
use futures_util::future::try_join_all;
1111
use generic_log_worker::{
12-
get_cached_metadata, get_durable_object_stub, init_logging, load_cache_kv, load_public_bucket,
13-
log_ops::UploadOptions, put_cache_entry_metadata, ObjectBackend, ObjectBucket, ENTRY_ENDPOINT,
12+
batcher_id_from_lookup_key, deserialize, get_cached_metadata, get_durable_object_stub,
13+
init_logging, load_cache_kv, load_public_bucket, log_ops::UploadOptions,
14+
put_cache_entry_metadata, serialize, ObjectBackend, ObjectBucket, ENTRY_ENDPOINT,
1415
METRICS_ENDPOINT,
1516
};
1617
use log::{debug, info, warn};
@@ -19,7 +20,7 @@ use serde::Serialize;
1920
use serde_with::{base64::Base64, serde_as};
2021
use sha2::{Digest, Sha256};
2122
use static_ct_api::{AddChainRequest, GetRootsResponse, StaticCTLogEntry};
22-
use tlog_tiles::{LogEntry, PendingLogEntry};
23+
use tlog_tiles::{LogEntry, PendingLogEntry, PendingLogEntryBlob};
2324
#[allow(clippy::wildcard_imports)]
2425
use worker::*;
2526

@@ -176,7 +177,6 @@ async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
176177
})
177178
}
178179

179-
#[allow(clippy::too_many_lines)]
180180
async fn add_chain_or_pre_chain(
181181
mut req: Request,
182182
env: &Env,
@@ -215,8 +215,7 @@ async fn add_chain_or_pre_chain(
215215

216216
// Check if entry is cached and return right away if so.
217217
if params.enable_dedup {
218-
if let Some(metadata) =
219-
get_cached_metadata(&load_cache_kv(env, name)?, &pending_entry).await?
218+
if let Some(metadata) = get_cached_metadata(&load_cache_kv(env, name)?, &lookup_key).await?
220219
{
221220
debug!("{name}: Entry is cached");
222221
let entry = StaticCTLogEntry::new(pending_entry, metadata);
@@ -242,30 +241,31 @@ async fn add_chain_or_pre_chain(
242241

243242
// Submit entry to be sequenced, either via a batcher or directly to the
244243
// sequencer.
245-
let stub = if params.num_batchers > 0 {
246-
let batcher_id = batcher_id_from_lookup_key(&lookup_key, params.num_batchers);
244+
let stub = {
245+
let shard_id = batcher_id_from_lookup_key(&lookup_key, params.num_batchers);
247246
get_durable_object_stub(
248247
env,
249248
name,
250-
Some(batcher_id),
251-
"BATCHER",
252-
params.location_hint.as_deref(),
253-
)?
254-
} else {
255-
get_durable_object_stub(
256-
env,
257-
name,
258-
None,
259-
"SEQUENCER",
249+
shard_id,
250+
if shard_id.is_some() {
251+
"BATCHER"
252+
} else {
253+
"SEQUENCER"
254+
},
260255
params.location_hint.as_deref(),
261256
)?
262257
};
258+
259+
let serialized = serialize(&PendingLogEntryBlob {
260+
lookup_key,
261+
data: serialize(&pending_entry)?,
262+
})?;
263263
let mut response = stub
264264
.fetch_with_request(Request::new_with_init(
265265
&format!("http://fake_url.com{ENTRY_ENDPOINT}"),
266266
&RequestInit {
267267
method: Method::Post,
268-
body: Some(serde_json::to_string(&pending_entry)?.into()),
268+
body: Some(serialized.into()),
269269
..Default::default()
270270
},
271271
)?)
@@ -274,7 +274,7 @@ async fn add_chain_or_pre_chain(
274274
// Return the response from the sequencing directly to the client.
275275
return Ok(response);
276276
}
277-
let metadata = response.json::<SequenceMetadata>().await?;
277+
let metadata = deserialize::<SequenceMetadata>(&response.bytes().await?)?;
278278
if params.num_batchers == 0 && params.enable_dedup {
279279
// Write sequenced entry to the long-term deduplication cache in Workers
280280
// KV as there are no batchers configured to do it for us.
@@ -295,10 +295,6 @@ async fn add_chain_or_pre_chain(
295295
Response::from_json(&sct)
296296
}
297297

298-
fn batcher_id_from_lookup_key(key: &LookupKey, num_batchers: u8) -> u8 {
299-
key[0] % num_batchers
300-
}
301-
302298
fn headers_from_http_metadata(meta: HttpMetadata) -> Headers {
303299
let h = Headers::new();
304300
if let Some(hdr) = meta.cache_control {

crates/ct_worker/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use signed_note::KeyName;
1010
use static_ct_api::StaticCTCheckpointSigner;
1111
use std::collections::HashMap;
1212
use std::sync::{LazyLock, OnceLock};
13-
use tlog_tiles::{CheckpointSigner, Ed25519CheckpointSigner, LookupKey, SequenceMetadata};
13+
use tlog_tiles::{CheckpointSigner, Ed25519CheckpointSigner, SequenceMetadata};
1414
#[allow(clippy::wildcard_imports)]
1515
use worker::*;
1616
use x509_cert::Certificate;

crates/ct_worker/src/sequencer_do.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use static_ct_api::StaticCTLogEntry;
1212
#[allow(clippy::wildcard_imports)]
1313
use worker::*;
1414

15-
#[durable_object]
15+
#[durable_object(alarm)]
1616
struct Sequencer(GenericSequencer<StaticCTLogEntry>);
1717

1818
impl DurableObject for Sequencer {

crates/generic_log_worker/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ static_ct_api.workspace = true
2222
[dependencies]
2323
anyhow.workspace = true
2424
base64.workspace = true
25+
bitcode.workspace = true
2526
byteorder.workspace = true
2627
console_error_panic_hook.workspace = true
2728
console_log.workspace = true
@@ -35,7 +36,6 @@ rand.workspace = true
3536
serde-wasm-bindgen.workspace = true
3637
serde.workspace = true
3738
serde_bytes.workspace = true
38-
serde_json.workspace = true
3939
sha2.workspace = true
4040
signed_note.workspace = true
4141
thiserror.workspace = true

crates/generic_log_worker/src/batcher_do.rs

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,25 @@
66
//!
77
//! Entries are assigned to Batcher shards with consistent hashing on the cache key.
88
9-
use crate::{LookupKey, SequenceMetadata, BATCH_ENDPOINT, ENTRY_ENDPOINT};
9+
use crate::{deserialize, serialize, LookupKey, SequenceMetadata, BATCH_ENDPOINT, ENTRY_ENDPOINT};
1010
use base64::prelude::*;
1111
use futures_util::future::{join_all, select, Either};
1212
use std::{
1313
cell::RefCell,
1414
collections::{HashMap, HashSet},
1515
time::Duration,
1616
};
17-
use tlog_tiles::PendingLogEntry;
17+
use tlog_tiles::PendingLogEntryBlob;
1818
use tokio::sync::watch::{self, Sender};
1919
use worker::kv::KvStore;
2020
#[allow(clippy::wildcard_imports)]
2121
use worker::*;
2222

23-
pub struct GenericBatcher<E: PendingLogEntry> {
23+
pub struct GenericBatcher {
2424
config: BatcherConfig,
2525
kv: Option<KvStore>,
2626
sequencer: Stub,
27-
batch: RefCell<Batch<E>>,
27+
batch: RefCell<Batch>,
2828
in_flight: RefCell<usize>,
2929
processed: RefCell<usize>,
3030
}
@@ -36,13 +36,13 @@ pub struct BatcherConfig {
3636
}
3737

3838
// A batch of entries to be submitted to the Sequencer together.
39-
struct Batch<E: PendingLogEntry> {
40-
entries: Vec<E>,
39+
struct Batch {
40+
entries: Vec<PendingLogEntryBlob>,
4141
by_hash: HashSet<LookupKey>,
4242
done: Sender<HashMap<LookupKey, SequenceMetadata>>,
4343
}
4444

45-
impl<E: PendingLogEntry> Default for Batch<E> {
45+
impl Default for Batch {
4646
/// Returns a batch initialized with a watch channel.
4747
fn default() -> Self {
4848
Self {
@@ -53,7 +53,7 @@ impl<E: PendingLogEntry> Default for Batch<E> {
5353
}
5454
}
5555

56-
impl<E: PendingLogEntry> GenericBatcher<E> {
56+
impl GenericBatcher {
5757
/// Returns a new batcher with the given config.
5858
pub fn new(config: BatcherConfig, kv: Option<KvStore>, sequencer: Stub) -> Self {
5959
Self {
@@ -77,8 +77,8 @@ impl<E: PendingLogEntry> GenericBatcher<E> {
7777
pub async fn fetch(&self, mut req: Request) -> Result<Response> {
7878
match req.path().as_str() {
7979
ENTRY_ENDPOINT => {
80-
let entry: E = req.json().await?;
81-
let key = entry.lookup_key();
80+
let entry: PendingLogEntryBlob = deserialize(&req.bytes().await?)?;
81+
let key = entry.lookup_key;
8282

8383
*self.in_flight.borrow_mut() += 1;
8484
*self.processed.borrow_mut() += 1;
@@ -135,7 +135,7 @@ impl<E: PendingLogEntry> GenericBatcher<E> {
135135

136136
let resp = if let Some(value) = recv.borrow().get(&key) {
137137
// The entry has been sequenced!
138-
Response::from_json(&value)
138+
Response::from_bytes(serialize(&value)?)
139139
} else {
140140
// Failed to sequence this entry, either due to an error
141141
// submitting the batch or rate limiting at the Sequencer.
@@ -150,7 +150,7 @@ impl<E: PendingLogEntry> GenericBatcher<E> {
150150
}
151151
}
152152

153-
impl<E: PendingLogEntry> GenericBatcher<E> {
153+
impl GenericBatcher {
154154
/// Submit the current pending batch to be sequenced.
155155
///
156156
/// # Errors
@@ -174,16 +174,19 @@ impl<E: PendingLogEntry> GenericBatcher<E> {
174174
&format!("http://fake_url.com{BATCH_ENDPOINT}"),
175175
&RequestInit {
176176
method: Method::Post,
177-
body: Some(serde_json::to_string(&batch.entries)?.into()),
177+
body: Some(serialize(&batch.entries)?.into()),
178178
..Default::default()
179179
},
180180
)?;
181-
let sequenced_entries: HashMap<LookupKey, SequenceMetadata> = self
182-
.sequencer
183-
.fetch_with_request(req)
184-
.await?
185-
.json::<Vec<(LookupKey, SequenceMetadata)>>()
186-
.await?
181+
let sequenced_entries: HashMap<LookupKey, SequenceMetadata> =
182+
deserialize::<Vec<(LookupKey, SequenceMetadata)>>(
183+
&self
184+
.sequencer
185+
.fetch_with_request(req)
186+
.await?
187+
.bytes()
188+
.await?,
189+
)?
187190
.into_iter()
188191
.collect();
189192

@@ -206,3 +209,12 @@ impl<E: PendingLogEntry> GenericBatcher<E> {
206209
Ok(())
207210
}
208211
}
212+
213+
/// Return a batcher ID to which the provided entry should be assigned.
214+
pub fn batcher_id_from_lookup_key(key: &LookupKey, num_batchers: u8) -> Option<u8> {
215+
if num_batchers > 0 {
216+
Some(key[0] % num_batchers)
217+
} else {
218+
None
219+
}
220+
}

0 commit comments

Comments
 (0)