Skip to content

Commit 4a4d285

Browse files
Do not double compress transaction update messages
1 parent 4dbd71a commit 4a4d285

File tree

5 files changed

+44
-91
lines changed

5 files changed

+44
-91
lines changed

crates/bench/benches/subscription.rs

-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ fn eval(c: &mut Criterion) {
138138
&plans,
139139
table_id,
140140
table_name.clone(),
141-
Compression::None,
142141
&tx,
143142
TableUpdateType::Subscribe,
144143
)))

crates/core/src/host/module_host.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,17 @@ impl UpdatesRelValue<'_> {
128128
!(self.deletes.is_empty() && self.inserts.is_empty())
129129
}
130130

131-
pub fn encode<F: WebsocketFormat>(&self, compression: Compression) -> (F::QueryUpdate, u64, usize) {
131+
pub fn encode<F: WebsocketFormat>(&self) -> (F::QueryUpdate, u64, usize) {
132132
let (deletes, nr_del) = F::encode_list(self.deletes.iter());
133133
let (inserts, nr_ins) = F::encode_list(self.inserts.iter());
134134
let num_rows = nr_del + nr_ins;
135135
let num_bytes = deletes.num_bytes() + inserts.num_bytes();
136136
let qu = QueryUpdate { deletes, inserts };
137-
let cqu = F::into_query_update(qu, compression);
137+
// We don't compress individual table updates.
138+
// Previously we were, but the benefits, if any, were unclear.
139+
// Note, each message is still compressed before being sent to clients,
140+
// but we no longer have to hold a tx lock when doing so.
141+
let cqu = F::into_query_update(qu, Compression::None);
138142
(cqu, num_rows, num_bytes)
139143
}
140144
}

crates/core/src/subscription/mod.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ pub fn collect_table_update<Tx, F>(
115115
plan_fragments: &[PipelinedProject],
116116
table_id: TableId,
117117
table_name: Box<str>,
118-
comp: Compression,
119118
tx: &Tx,
120119
update_type: TableUpdateType,
121120
) -> Result<(TableUpdate<F>, ExecutionMetrics)>
@@ -135,15 +134,17 @@ where
135134
inserts: empty,
136135
},
137136
};
138-
let update = F::into_query_update(qu, comp);
137+
// We will compress the outer server message,
138+
// after we release the tx lock.
139+
// There's no need to compress the inner table update too.
140+
let update = F::into_query_update(qu, Compression::None);
139141
(TableUpdate::new(table_id, table_name, (update, num_rows)), metrics)
140142
})
141143
}
142144

143145
/// Execute a collection of subscription queries in parallel
144146
pub fn execute_plans<Tx, F>(
145147
plans: &[Arc<Plan>],
146-
comp: Compression,
147148
tx: &Tx,
148149
update_type: TableUpdateType,
149150
) -> Result<(DatabaseUpdate<F>, ExecutionMetrics), DBError>
@@ -160,7 +161,7 @@ where
160161
.clone()
161162
.optimize()
162163
.map(|plan| (sql, PipelinedProject::from(plan)))
163-
.and_then(|(_, plan)| collect_table_update(&[plan], table_id, table_name.into(), comp, tx, update_type))
164+
.and_then(|(_, plan)| collect_table_update(&[plan], table_id, table_name.into(), tx, update_type))
164165
.map_err(|err| DBError::WithSql {
165166
sql: sql.into(),
166167
error: Box::new(DBError::Other(err)),

crates/core/src/subscription/module_subscription_actor.rs

+12-68
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use crate::worker_metrics::WORKER_METRICS;
2323
use parking_lot::RwLock;
2424
use prometheus::IntGauge;
2525
use spacetimedb_client_api_messages::websocket::{
26-
self as ws, BsatnFormat, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, TableUpdate,
27-
Unsubscribe, UnsubscribeMulti,
26+
self as ws, BsatnFormat, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, TableUpdate, Unsubscribe,
27+
UnsubscribeMulti,
2828
};
2929
use spacetimedb_execution::pipelined::PipelinedProject;
3030
use spacetimedb_expr::check::parse_and_type_sub;
@@ -186,34 +186,10 @@ impl ModuleSubscriptions {
186186
let tx = DeltaTx::from(tx);
187187

188188
Ok(match sender.config.protocol {
189-
Protocol::Binary => {
190-
collect_table_update(
191-
&plans,
192-
table_id,
193-
table_name.into(),
194-
// We will compress the outer server message,
195-
// after we release the tx lock.
196-
// There's no need to compress the inner table update too.
197-
Compression::None,
198-
&tx,
199-
update_type,
200-
)
201-
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))
202-
}
203-
Protocol::Text => {
204-
collect_table_update(
205-
&plans,
206-
table_id,
207-
table_name.into(),
208-
// We will compress the outer server message,
209-
// after we release the tx lock,
210-
// There's no need to compress the inner table update too.
211-
Compression::None,
212-
&tx,
213-
update_type,
214-
)
215-
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))
216-
}
189+
Protocol::Binary => collect_table_update(&plans, table_id, table_name.into(), &tx, update_type)
190+
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics)),
191+
Protocol::Text => collect_table_update(&plans, table_id, table_name.into(), &tx, update_type)
192+
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics)),
217193
}?)
218194
}
219195

@@ -240,27 +216,11 @@ impl ModuleSubscriptions {
240216
let tx = DeltaTx::from(tx);
241217
match sender.config.protocol {
242218
Protocol::Binary => {
243-
let (update, metrics) = execute_plans(
244-
queries,
245-
// We will compress the outer server message,
246-
// after we release the tx lock.
247-
// There's no need to compress the inner table updates too.
248-
Compression::None,
249-
&tx,
250-
update_type,
251-
)?;
219+
let (update, metrics) = execute_plans(queries, &tx, update_type)?;
252220
Ok((FormatSwitch::Bsatn(update), metrics))
253221
}
254222
Protocol::Text => {
255-
let (update, metrics) = execute_plans(
256-
queries,
257-
// We will compress the outer server message,
258-
// after we release the tx lock.
259-
// There's no need to compress the inner table updates too.
260-
Compression::None,
261-
&tx,
262-
update_type,
263-
)?;
223+
let (update, metrics) = execute_plans(queries, &tx, update_type)?;
264224
Ok((FormatSwitch::Json(update), metrics))
265225
}
266226
}
@@ -650,26 +610,10 @@ impl ModuleSubscriptions {
650610

651611
let tx = DeltaTx::from(&*tx);
652612
let (database_update, metrics) = match sender.config.protocol {
653-
Protocol::Binary => execute_plans(
654-
&queries,
655-
// We will compress the outer server message,
656-
// after we release the tx lock.
657-
// There's no need to compress the inner table updates too.
658-
Compression::None,
659-
&tx,
660-
TableUpdateType::Subscribe,
661-
)
662-
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
663-
Protocol::Text => execute_plans(
664-
&queries,
665-
// We will compress the outer server message,
666-
// after we release the tx lock.
667-
// There's no need to compress the inner table updates too.
668-
Compression::None,
669-
&tx,
670-
TableUpdateType::Subscribe,
671-
)
672-
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
613+
Protocol::Binary => execute_plans(&queries, &tx, TableUpdateType::Subscribe)
614+
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
615+
Protocol::Text => execute_plans(&queries, &tx, TableUpdateType::Subscribe)
616+
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
673617
};
674618

675619
record_exec_metrics(

crates/core/src/subscription/module_subscription_manager.rs

+21-16
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use hashbrown::{HashMap, HashSet};
1616
use itertools::Itertools;
1717
use rayon::iter::{IntoParallelIterator, ParallelIterator};
1818
use spacetimedb_client_api_messages::websocket::{
19-
BsatnFormat, CompressableQueryUpdate, Compression, FormatSwitch, JsonFormat, QueryId, QueryUpdate, WebsocketFormat,
19+
BsatnFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, WebsocketFormat,
2020
};
2121
use spacetimedb_data_structures::map::{Entry, IntMap};
2222
use spacetimedb_lib::metrics::ExecutionMetrics;
@@ -817,26 +817,37 @@ impl SubscriptionManager {
817817
.fold(FoldState::default, |mut acc, (qstate, plan)| {
818818
let table_id = plan.subscribed_table_id();
819819
let table_name = plan.subscribed_table_name();
820-
// Store at most one copy of the serialization to BSATN x Compression
821-
// and ditto for the "serialization" for JSON.
820+
// Store at most one copy for both the serialization to BSATN and JSON.
822821
// Each subscriber gets to pick which of these they want,
823-
// but we only fill `ops_bin_{compression}` and `ops_json` at most once.
822+
// but we only fill `ops_bin_uncompressed` and `ops_json` at most once.
824823
// The former will be `Some(_)` if some subscriber uses `Protocol::Binary`
825824
// and the latter `Some(_)` if some subscriber uses `Protocol::Text`.
826-
let mut ops_bin_brotli: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
827-
let mut ops_bin_gzip: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
828-
let mut ops_bin_none: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
825+
//
826+
// Previously we were compressing each `QueryUpdate` within a `TransactionUpdate`.
827+
// The reason was simple - many clients can subscribe to the same query.
828+
// If we compress `TransactionUpdate`s independently for each client,
829+
// we could be doing a lot of redundant compression.
830+
//
831+
// However the risks associated with this approach include:
832+
// 1. We have to hold the tx lock when compressing
833+
// 2. A potentially worse compression ratio
834+
// 3. Extra decompression overhead on the client
835+
//
836+
// Because transaction processing is currently single-threaded,
837+
// the risks of holding the tx lock for longer than necessary,
838+
// as well as additional the message processing overhead on the client,
839+
// outweighed the benefit of reduced cpu with the former approach.
840+
let mut ops_bin_uncompressed: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
829841
let mut ops_json: Option<(QueryUpdate<JsonFormat>, _, _)> = None;
830842

831843
fn memo_encode<F: WebsocketFormat>(
832844
updates: &UpdatesRelValue<'_>,
833-
client: &ClientConnectionSender,
834845
memory: &mut Option<(F::QueryUpdate, u64, usize)>,
835846
metrics: &mut ExecutionMetrics,
836847
) -> (F::QueryUpdate, u64) {
837848
let (update, num_rows, num_bytes) = memory
838849
.get_or_insert_with(|| {
839-
let encoded = updates.encode::<F>(client.config.compression);
850+
let encoded = updates.encode::<F>();
840851
// The first time we insert into this map, we call encode.
841852
// This is when we serialize the rows to BSATN/JSON.
842853
// Hence this is where we increment `bytes_scanned`.
@@ -884,17 +895,11 @@ impl SubscriptionManager {
884895
let update = match client.config.protocol {
885896
Protocol::Binary => Bsatn(memo_encode::<BsatnFormat>(
886897
&delta_updates,
887-
client,
888-
match client.config.compression {
889-
Compression::Brotli => &mut ops_bin_brotli,
890-
Compression::Gzip => &mut ops_bin_gzip,
891-
Compression::None => &mut ops_bin_none,
892-
},
898+
&mut ops_bin_uncompressed,
893899
&mut acc.metrics,
894900
)),
895901
Protocol::Text => Json(memo_encode::<JsonFormat>(
896902
&delta_updates,
897-
client,
898903
&mut ops_json,
899904
&mut acc.metrics,
900905
)),

0 commit comments

Comments
 (0)