Skip to content

Commit 4dbd71a

Browse files
Add regression test tx update compression
1 parent b1002f7 commit 4dbd71a

File tree

1 file changed

+53
-1
lines changed

1 file changed

+53
-1
lines changed

crates/core/src/subscription/module_subscription_actor.rs

+53-1
Original file line numberDiff line numberDiff line change
@@ -1539,7 +1539,9 @@ mod tests {
15391539
Ok(())
15401540
}
15411541

1542-
/// Test that we do not compress the results of an initial subscribe call
1542+
/// Test that we do not compress within a [SubscriptionMessage].
1543+
/// The message itself is compressed before being sent over the wire,
1544+
/// but we don't care about that for this test.
15431545
#[tokio::test]
15441546
async fn test_no_compression_for_subscribe() -> anyhow::Result<()> {
15451547
// Establish a client connection with compression
@@ -1583,6 +1585,56 @@ mod tests {
15831585
Ok(())
15841586
}
15851587

1588+
/// Test that we do not compress within a [TransactionUpdateMessage].
1589+
/// The message itself is compressed before being sent over the wire,
1590+
/// but we don't care about that for this test.
1591+
#[tokio::test]
1592+
async fn test_no_compression_for_update() -> anyhow::Result<()> {
1593+
// Establish a client connection with compression
1594+
let (tx, mut rx) = client_connection_with_compression(client_id_from_u8(1), Compression::Brotli);
1595+
1596+
let db = relational_db()?;
1597+
let subs = module_subscriptions(db.clone());
1598+
1599+
let table_id = db.create_table_for_test("t", &[("x", AlgebraicType::U64)], &[])?;
1600+
1601+
let mut inserts = vec![];
1602+
1603+
for i in 0..16_000u64 {
1604+
inserts.push((table_id, product![i]));
1605+
}
1606+
1607+
// Subscribe to the entire table
1608+
subscribe_multi(&subs, &["select * from t"], tx, &mut 0)?;
1609+
1610+
// Wait to receive the initial subscription message
1611+
assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))));
1612+
1613+
// Insert a lot of rows into `t`.
1614+
// We want to insert enough to cross any threshold there might be for compression.
1615+
commit_tx(&db, &subs, [], inserts)?;
1616+
1617+
// Assert the table updates within this message are all be uncompressed
1618+
match rx.recv().await {
1619+
Some(SerializableMessage::TxUpdate(TransactionUpdateMessage {
1620+
database_update:
1621+
SubscriptionUpdateMessage {
1622+
database_update: FormatSwitch::Bsatn(ws::DatabaseUpdate { tables }),
1623+
..
1624+
},
1625+
..
1626+
})) => {
1627+
assert!(tables.iter().all(|TableUpdate { updates, .. }| updates
1628+
.iter()
1629+
.all(|query_update| matches!(query_update, CompressableQueryUpdate::Uncompressed(_)))));
1630+
}
1631+
Some(_) => panic!("unexpected message from subscription"),
1632+
None => panic!("channel unexpectedly closed"),
1633+
};
1634+
1635+
Ok(())
1636+
}
1637+
15861638
/// In this test we subscribe to a join query, update the lhs table,
15871639
/// and assert that the server sends the correct delta to the client.
15881640
#[tokio::test]

0 commit comments

Comments
 (0)