Skip to content

Commit 9fc5225

Browse files
committed
Fix V2 statement wire format and remove hidden FPR defaults
1 parent d53883a commit 9fc5225

File tree

8 files changed

+102
-93
lines changed

8 files changed

+102
-93
lines changed

lib/src/network/codec/statement.rs

Lines changed: 68 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ const PROOF_ON_CHAIN: u8 = 3;
4848
pub use super::affinity::AffinityFilter;
4949

5050
#[derive(Debug, Clone)]
51-
pub enum StatementMessage<'a> {
52-
Statements(Vec<&'a [u8]>),
51+
pub enum StatementMessage {
52+
Statements(Vec<([u8; 32], Statement)>),
5353
ExplicitTopicAffinity(AffinityFilter),
5454
}
5555

@@ -210,14 +210,15 @@ const V2_TAG_AFFINITY: u8 = 0x01;
210210

211211
pub fn decode_statement_message(
212212
bytes: &[u8],
213-
) -> Result<StatementMessage<'_>, DecodeStatementMessageError> {
213+
) -> Result<StatementMessage, DecodeStatementMessageError> {
214214
if bytes.is_empty() {
215215
return Err(DecodeStatementMessageError::Empty);
216216
}
217217

218218
match bytes[0] {
219219
V2_TAG_STATEMENTS => {
220-
let stmts = extract_statement_bytes(&bytes[1..])?;
220+
let stmts = decode_statement_notification(&bytes[1..])
221+
.map_err(DecodeStatementMessageError::InvalidStatements)?;
221222
Ok(StatementMessage::Statements(stmts))
222223
}
223224
V2_TAG_AFFINITY => {
@@ -230,67 +231,27 @@ pub fn decode_statement_message(
230231
}
231232

232233
pub fn encode_statements_message(statements: &[&[u8]]) -> Vec<u8> {
234+
let tag_len = 1;
235+
let max_compact_len = 5;
233236
let total_len: usize = statements.iter().map(|s| s.len()).sum();
234-
let mut out = Vec::with_capacity(1 + 5 + total_len);
237+
let mut out = Vec::with_capacity(tag_len + max_compact_len + total_len);
235238
out.push(V2_TAG_STATEMENTS);
236239
out.extend_from_slice(crate::util::encode_scale_compact_usize(statements.len()).as_ref());
237240
for stmt in statements {
238-
out.extend_from_slice(crate::util::encode_scale_compact_usize(stmt.len()).as_ref());
239241
out.extend_from_slice(stmt);
240242
}
241243
out
242244
}
243245

244246
pub fn encode_topic_affinity_message(filter: &AffinityFilter) -> Vec<u8> {
247+
let tag_len = 1;
245248
let encoded = filter.encode_to_vec();
246-
let mut out = Vec::with_capacity(1 + encoded.len());
249+
let mut out = Vec::with_capacity(tag_len + encoded.len());
247250
out.push(V2_TAG_AFFINITY);
248251
out.extend_from_slice(&encoded);
249252
out
250253
}
251254

252-
fn extract_statement_bytes(data: &[u8]) -> Result<Vec<&[u8]>, DecodeStatementMessageError> {
253-
let (mut remaining, count) =
254-
crate::util::nom_scale_compact_usize::<nom::error::Error<&[u8]>>(data).map_err(|_| {
255-
DecodeStatementMessageError::InvalidStatements(DecodeStatementNotificationError(
256-
nom::error::ErrorKind::Fail,
257-
))
258-
})?;
259-
260-
if count > MAX_STATEMENTS_PER_NOTIFICATION {
261-
return Err(DecodeStatementMessageError::InvalidStatements(
262-
DecodeStatementNotificationError(nom::error::ErrorKind::TooLarge),
263-
));
264-
}
265-
266-
let mut statements = Vec::with_capacity(count);
267-
for _ in 0..count {
268-
let (rest, len) = crate::util::nom_scale_compact_usize::<nom::error::Error<&[u8]>>(
269-
remaining,
270-
)
271-
.map_err(|_| {
272-
DecodeStatementMessageError::InvalidStatements(DecodeStatementNotificationError(
273-
nom::error::ErrorKind::Fail,
274-
))
275-
})?;
276-
if rest.len() < len {
277-
return Err(DecodeStatementMessageError::InvalidStatements(
278-
DecodeStatementNotificationError(nom::error::ErrorKind::Eof),
279-
));
280-
}
281-
statements.push(&rest[..len]);
282-
remaining = &rest[len..];
283-
}
284-
285-
if !remaining.is_empty() {
286-
return Err(DecodeStatementMessageError::InvalidStatements(
287-
DecodeStatementNotificationError(nom::error::ErrorKind::NonEmpty),
288-
));
289-
}
290-
291-
Ok(statements)
292-
}
293-
294255
#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
295256
pub enum DecodeStatementMessageError {
296257
#[display("Empty V2 statement message")]
@@ -751,15 +712,56 @@ mod tests {
751712
match decoded {
752713
StatementMessage::Statements(stmts) => {
753714
assert_eq!(stmts.len(), 2);
754-
assert_eq!(stmts[0], encoded1.as_slice());
755-
assert_eq!(stmts[1], encoded2.as_slice());
715+
assert_eq!(stmts[0].1, statement1);
716+
assert_eq!(stmts[1].1, statement2);
756717
}
757718
_ => panic!("Expected Statements variant"),
758719
}
759720
}
760721

761722
#[test]
762-
fn v2_affinity_roundtrip() {
723+
fn v2_statements_encoding_snapshot() {
724+
let statement = Statement {
725+
proof: Some(Proof::OnChain {
726+
who: [42u8; 32],
727+
block_hash: [24u8; 32],
728+
event_index: 66,
729+
}),
730+
decryption_key: Some([0xde; 32]),
731+
expiry: 999,
732+
channel: Some([0xcc; 32]),
733+
topics: vec![[0x01; 32], [0x02; 32]],
734+
data: Some(vec![55, 99]),
735+
};
736+
737+
let stmt_bytes = encode_statement(&statement).unwrap();
738+
let v2_encoded = encode_statements_message(&[&stmt_bytes]);
739+
740+
let digest: [u8; 32] = blake2_rfc::blake2b::blake2b(32, &[], &v2_encoded)
741+
.as_bytes()
742+
.try_into()
743+
.unwrap();
744+
assert_eq!(
745+
digest,
746+
[
747+
44, 71, 235, 73, 238, 115, 6, 15, 128, 174, 159, 216, 166, 76, 26, 101, 28, 143,
748+
88, 21, 22, 128, 169, 62, 180, 19, 164, 234, 174, 210, 81, 105
749+
],
750+
"blake2_256 digest must match polkadot-sdk snapshot"
751+
);
752+
753+
let decoded = decode_statement_message(&v2_encoded).unwrap();
754+
match decoded {
755+
StatementMessage::Statements(stmts) => {
756+
assert_eq!(stmts.len(), 1);
757+
assert_eq!(stmts[0].1, statement);
758+
}
759+
_ => panic!("Expected Statements variant"),
760+
}
761+
}
762+
763+
#[test]
764+
fn v2_affinity_encoding_snapshot() {
763765
let topic1 = [0x01u8; 32];
764766
let topic2 = [0x02u8; 32];
765767
let topic3 = [0x03u8; 32];
@@ -769,8 +771,21 @@ mod tests {
769771
filter.insert(&topic2);
770772

771773
let encoded = encode_topic_affinity_message(&filter);
772-
let decoded = decode_statement_message(&encoded).unwrap();
773774

775+
let digest: [u8; 32] = blake2_rfc::blake2b::blake2b(32, &[], &encoded)
776+
.as_bytes()
777+
.try_into()
778+
.unwrap();
779+
assert_eq!(
780+
digest,
781+
[
782+
82, 59, 251, 163, 43, 156, 130, 249, 35, 214, 187, 99, 4, 105, 179, 131, 42, 117,
783+
191, 57, 160, 243, 233, 20, 204, 239, 62, 120, 55, 5, 234, 62
784+
],
785+
"blake2_256 digest must match polkadot-sdk snapshot"
786+
);
787+
788+
let decoded = decode_statement_message(&encoded).unwrap();
774789
let StatementMessage::ExplicitTopicAffinity(af) = decoded else {
775790
panic!("Expected ExplicitTopicAffinity variant");
776791
};

lib/src/network/service.rs

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2877,23 +2877,7 @@ where
28772877
}
28782878
codec::StatementProtocolVersion::V2 => {
28792879
match codec::decode_statement_message(&notification) {
2880-
Ok(codec::StatementMessage::Statements(stmts)) => {
2881-
let mut statements = Vec::with_capacity(stmts.len());
2882-
for raw in stmts {
2883-
let hash = codec::statement_hash(raw);
2884-
match codec::decode_statement(raw) {
2885-
Ok(s) => statements.push((hash, s)),
2886-
Err(err) => {
2887-
return Some(Event::ProtocolError {
2888-
error:
2889-
ProtocolError::BadStatementNotification(
2890-
err,
2891-
),
2892-
peer_id: self.peers[peer_index.0].clone(),
2893-
});
2894-
}
2895-
}
2896-
}
2880+
Ok(codec::StatementMessage::Statements(statements)) => {
28972881
if statements.is_empty() {
28982882
continue;
28992883
}
@@ -4119,7 +4103,6 @@ where
41194103
let Some(&peer_index) = self.peers_by_peer_id.get(target) else {
41204104
return Err(QueueNotificationError::NoConnection);
41214105
};
4122-
41234106
let chain_index = chain_id.0;
41244107

41254108
let (protocol, notification) = self
@@ -4159,7 +4142,6 @@ where
41594142
let Some(&peer_index) = self.peers_by_peer_id.get(target) else {
41604143
return Err(SendTopicAffinityError::NoConnection);
41614144
};
4162-
41634145
let chain_index = chain_id.0;
41644146

41654147
match self.find_statement_protocol_for_peer(peer_index, chain_index) {
@@ -4739,15 +4721,28 @@ pub enum Event<TConn> {
47394721
statements: Vec<([u8; 32], codec::Statement)>,
47404722
},
47414723

4724+
/// A statement protocol substream has been successfully negotiated with a peer.
4725+
///
4726+
/// Indicates which protocol version (V1 or V2) was agreed upon.
47424727
StatementProtocolConnected {
4728+
/// Identity of the remote peer.
47434729
peer_id: PeerId,
4730+
/// Index of the chain the substream relates to.
47444731
chain_id: ChainId,
4732+
/// Negotiated statement protocol version.
47454733
version: codec::StatementProtocolVersion,
47464734
},
47474735

4736+
/// Received a topic affinity bloom filter from a V2 peer.
4737+
///
4738+
/// The filter indicates which statement topics the peer is interested in.
4739+
/// Only statements matching the filter need to be sent to this peer.
47484740
StatementTopicAffinityReceived {
4741+
/// Identity of the remote peer.
47494742
peer_id: PeerId,
4743+
/// Index of the chain the affinity relates to.
47504744
chain_id: ChainId,
4745+
/// Bloom filter representing the peer's topic interests.
47514746
filter: codec::AffinityFilter,
47524747
},
47534748

light-base/src/json_rpc_service.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,8 @@ pub struct Config<TPlat: PlatformRef> {
116116
/// Hash of the genesis block of the chain.
117117
pub genesis_block_hash: [u8; 32],
118118

119-
/// False positive rate for bloom filters used in statement topic affinity.
120-
/// `None` if the statement protocol is disabled.
121-
pub bloom_false_positive_rate: Option<f64>,
119+
/// Statement protocol configuration. `None` if the statement protocol is disabled.
120+
pub statement_protocol_config: Option<network_service::StatementProtocolConfig>,
122121
}
123122

124123
/// Creates a new JSON-RPC service with the given configuration.
@@ -157,7 +156,7 @@ pub fn service<TPlat: PlatformRef>(config: Config<TPlat>) -> Frontend<TPlat> {
157156
system_name: config.system_name,
158157
system_version: config.system_version,
159158
genesis_block_hash: config.genesis_block_hash,
160-
bloom_false_positive_rate: config.bloom_false_positive_rate,
159+
statement_protocol_config: config.statement_protocol_config,
161160
},
162161
requests_rx,
163162
responses_tx,

light-base/src/json_rpc_service/background.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ pub(super) struct Config<TPlat: PlatformRef> {
8787
/// Hash of the genesis block of the chain.
8888
pub genesis_block_hash: [u8; 32],
8989

90-
pub bloom_false_positive_rate: Option<f64>,
90+
/// Statement protocol configuration. `None` if the statement protocol is disabled.
91+
pub statement_protocol_config: Option<network_service::StatementProtocolConfig>,
9192
}
9293

9394
/// Fields used to process JSON-RPC requests in the background.
@@ -118,7 +119,7 @@ struct Background<TPlat: PlatformRef> {
118119
/// Randomness used for various purposes, such as generating subscription IDs.
119120
randomness: ChaCha20Rng,
120121

121-
bloom_false_positive_rate: Option<f64>,
122+
statement_protocol_config: Option<network_service::StatementProtocolConfig>,
122123

123124
bloom_seed: u128,
124125

@@ -517,7 +518,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
517518
config.platform.fill_random_bytes(&mut seed);
518519
seed
519520
}),
520-
bloom_false_positive_rate: config.bloom_false_positive_rate,
521+
statement_protocol_config: config.statement_protocol_config,
521522
bloom_seed: {
522523
let mut seed_bytes = [0u8; 16];
523524
config.platform.fill_random_bytes(&mut seed_bytes);
@@ -779,7 +780,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
779780
if matches!(version, network_service::StatementProtocolVersion::V2) {
780781
me.v2_statement_peers.insert(peer_id.clone());
781782
if !me.statement_subscriptions.is_empty() {
782-
let fpr = me.bloom_false_positive_rate.unwrap_or(0.01);
783+
let fpr = me.statement_protocol_config.as_ref().expect("V2 peers require statement protocol; qed").false_positive_rate();
783784
let combined_filter = build_combined_affinity_filter(
784785
&me.statement_subscriptions,
785786
me.bloom_seed,
@@ -2926,7 +2927,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
29262927
.insert(subscription_id.clone(), filter);
29272928

29282929
if !me.v2_statement_peers.is_empty() {
2929-
let fpr = me.bloom_false_positive_rate.unwrap_or(0.01);
2930+
let fpr = me.statement_protocol_config.as_ref().expect("V2 peers require statement protocol; qed").false_positive_rate();
29302931
let combined_filter = build_combined_affinity_filter(
29312932
&me.statement_subscriptions,
29322933
me.bloom_seed,
@@ -2955,7 +2956,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
29552956
let existed = me.statement_subscriptions.remove(&subscription).is_some();
29562957

29572958
if existed && !me.v2_statement_peers.is_empty() {
2958-
let fpr = me.bloom_false_positive_rate.unwrap_or(0.01);
2959+
let fpr = me.statement_protocol_config.as_ref().expect("V2 peers require statement protocol; qed").false_positive_rate();
29592960
let combined_filter = build_combined_affinity_filter(
29602961
&me.statement_subscriptions,
29612962
me.bloom_seed,

light-base/src/lib.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -672,11 +672,6 @@ impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
672672
}
673673
};
674674

675-
let bloom_false_positive_rate = config
676-
.statement_protocol_config
677-
.as_ref()
678-
.map(|c| c.false_positive_rate());
679-
680675
// Start the services of the chain to add, or grab the services if they already exist.
681676
let (services, log_name) = match chains_by_key.entry(new_chain_key.clone()) {
682677
Entry::Occupied(mut entry) => {
@@ -702,6 +697,7 @@ impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
702697
);
703698

704699
let statement_protocol_config = config.statement_protocol_config;
700+
let statement_protocol_config_for_rpc = statement_protocol_config.clone();
705701

706702
let config = match (&relay_chain, &chain_information) {
707703
(Some((relay_chain, para_id, _)), Some(chain_information)) => {
@@ -956,7 +952,7 @@ impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
956952
system_name: self.platform.client_name().into_owned(),
957953
system_version: self.platform.client_version().into_owned(),
958954
genesis_block_hash,
959-
bloom_false_positive_rate,
955+
statement_protocol_config: statement_protocol_config_for_rpc,
960956
});
961957

962958
Some(frontend)

light-base/src/sync_service/standalone.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -811,10 +811,13 @@ pub(super) async fn start_standalone_chain<TPlat: PlatformRef>(
811811

812812
WakeUpReason::NetworkEvent(network_service::Event::StatementsNotification {
813813
..
814-
}) => {}
815-
WakeUpReason::NetworkEvent(network_service::Event::StatementProtocolConnected {
814+
})
815+
| WakeUpReason::NetworkEvent(network_service::Event::StatementProtocolConnected {
816816
..
817-
}) => {}
817+
}) => {
818+
// Statement store protocol events are handled by the JSON-RPC service.
819+
// The standalone sync service doesn't need to react to them.
820+
}
818821

819822
WakeUpReason::MustSubscribeNetworkEvents => {
820823
debug_assert!(task.from_network_service.is_none());

wasm-node/javascript/src/internals/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ export function start(options: ClientOptions, wasmModule: SmoldotBytecode | Prom
521521
if (statementStoreMaxSeenStatements > 0xffffffff) {
522522
statementStoreMaxSeenStatements = 0xffffffff;
523523
}
524-
statementStoreFalsePositiveRate = options.statementStore.falsePositiveRate === undefined ? 0.01 : options.statementStore.falsePositiveRate;
524+
statementStoreFalsePositiveRate = options.statementStore.falsePositiveRate;
525525
if (statementStoreFalsePositiveRate <= 0.0 || statementStoreFalsePositiveRate >= 1.0 || isNaN(statementStoreFalsePositiveRate)) {
526526
throw new AddChainError("Invalid value for `statementStore.falsePositiveRate`");
527527
}

wasm-node/javascript/src/public-types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,6 @@ export interface AddChainOptions {
464464
*/
465465
statementStore?: {
466466
maxSeenStatements?: number,
467-
falsePositiveRate?: number,
467+
falsePositiveRate: number,
468468
}
469469
}

0 commit comments

Comments
 (0)