Skip to content

Conversation

@alexggh
Copy link
Contributor

@alexggh alexggh commented Dec 18, 2025

Work in progress

Description

Implements the proposed changes from here #10452, where the API surface has been limited to two functions submit and a subscribe_statement.

Submit changes

For submitting the only changes are implementing the logic of the newly expiry field where statements with an expiration timestamp lower than current time get rejected.

Subscribe changes

For subscription the approach is to have a configurable number of workers that keep track of all subscriptions, each time a new subscription arrives it gets assigned by a round-robin protocol to one of the workers.

When a new statements gets accepted by the statement-store it notifies all workers about the statement, the workers then go and evaluate all subscription filters they are assigned to and notifies each of them accordingly.

Remaining work

  • Add more tests.
  • Evaluate performance under load and port statement_store_bench.rs to using subscribe API, ported and performance stay the same on the benchmark, because bottlenecks are elsewhere and being addressed.
[2026-01-07T17:18:51Z INFO  tests::zombie_ci::statement_store_bench] Participants: 49998, each sent: 6, received: 0
[2026-01-07T17:18:51Z INFO  tests::zombie_ci::statement_store_bench] Summary        min       avg       max
[2026-01-07T17:18:51Z INFO  tests::zombie_ci::statement_store_bench]  time, s         8        19        24
[2026-01-07T17:18:51Z INFO  tests::zombie_ci::statement_store_bench]  retries         0         0         0
  • Evaluate and set proper RPC limits. The normal RPC limits apply here we just need to make sure RPC nodes run with limits high enough, so each user can connect to the node all at the same time.
  • Periodically scan existing statements and removed expired ones.
  • Plan for deploying it with minimising impact for people prototype with the old APIs
    • This modifies statement format(expiry field), Runtime and Node need to run latest version.

Local testing

To obtain a local node with the new APIs available run:

cargo build -p polkadot -p polkadot-parachain-bin  --bin polkadot --bin polkadot-prepare-worker --bin polkadot-execute-worker --bin polkadot-parachain  --features fast-runtime;
 
ZOMBIE_PROVIDER=native cargo test -p cumulus-zombienet-sdk-tests --features zombie-ci zombie_ci::statement_store::statement_store

The rpc addresses are visible in the output:

charlie: direct link (pjs) https://polkadot.js.org/apps/?rpc=ws://127.0.0.1:26997#/explorer
dave: direct link (pjs) https://polkadot.js.org/apps/?rpc=ws://127.0.0.1:12623#/explorer

AndreiEres and others added 28 commits November 25, 2025 16:04
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
@alexggh alexggh added the T0-node This PR/Issue is related to the topic “node”. label Jan 12, 2026
Signed-off-by: Alexandru Gheorghe <[email protected]>
Copy link
Contributor

@AndreiEres AndreiEres left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, Alex!
Looks good, left a few questions.

Copy link
Contributor

@s0me0ne-unkn0wn s0me0ne-unkn0wn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just skimming it for now. The API itself looks good and quite intuitive. I'm yet to understand the mechanics happening inside.

@P1sar
Copy link

P1sar commented Jan 14, 2026

Closes: #10568, #10457

Signed-off-by: Alexandru Gheorghe <[email protected]>
log::info!("Statement store test passed");
log::info!("Keeping network alive");

tokio::time::sleep(Duration::from_hours(24)).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need hang the test for 24h?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is there for App guys to have a node setup they can run locally, will remove it before merge.

Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
fn subscribe_statement(
&self,
pending: PendingSubscriptionSink,
_ext: &Extensions,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to reuse existing subscription routine if sessionID or/and address with topic filter is the same?
Otherwise malicious actor can create unlimited amount of the same subscriptions

Copy link
Contributor Author

@alexggh alexggh Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the number of connections and the number of subscriptions per connection accepted by a node are already limited and configurable with the following cli arguments--rpc-max-subscriptions-per-connection and --rpc-max-connection .

I think that covers your attack scenario or am I missing something ?

Copy link

@P1sar P1sar Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rpc-max-subscriptions-per-connection caps the number of subscriptions per connection, which helps protect the network stack. But I’m still wondering whether this can be used as a DDoS vector against a node at the application layer.

The default/typical limit is quite high (e.g., ~100k), and in our case the bottleneck isn’t the number of WebSocket connections, it’s the subscription handling itself. A single client/browser can still legitimately create ~100k statement subscriptions using the same topic, which can overwhelm our logical workers / subscription-processing pipeline.

So the concern here isn’t connection count, but application logic load: one connection can create an excessive number of subscriptions and effectively exhaust CPU/memory/worker capacity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default/typical limit is quite high (e.g., ~100k),

Not sure I understand where the 100k is coming, the current polkadot-parachain configuration is 1024:

      --rpc-max-subscriptions-per-connection <RPC_MAX_SUBSCRIPTIONS_PER_CONNECTION>
          Set the maximum concurrent subscriptions per connection
          
          [default: 1024]

Which is not the high, if server wants to protect itself it can set that to a lower numbers to reduce the risk, my thinking is that this is a generic mechanism that already exists, so I don't think we should implement one just for our subscription API.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I am not talking about rate limit. My assumption is to reuse subscription routines if it is subscribed for the same topics. eg if 100k clients subscribed for topicA we can have only one routine that serves all all 100k subscriptions.

Copy link
Contributor Author

@alexggh alexggh Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I am not talking about rate limit. My assumption is to reuse subscription routines if it is subscribed for the same topics. eg if 100k clients subscribed for topicA we can have only one routine that serves all all 100k subscriptions.

Ok, this task here is just for pushing the data towards the subscription stream, it uses the common pattern I saw in polkadot-sdk, as far as I can tell they are really cheap. The main grunt of work to determine if a statement matches filters for subscriptions is done in the subscription workers here https://github.com/paritytech/polkadot-sdk/pull/10690/files#diff-c3deb7e86e76f267c5ac9191aa895ddcd46f7807ae858e84592ad3d3c6c169b3R92 and those are limited in size.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that most of our other RPCs are already easy dosable. At some point we had some working on this, but I think this stopped.

Comment on lines +53 to +64
/// Topic filter for statement subscriptions.
#[derive(Clone, Debug)]
pub enum CheckedTopicFilter {
/// Matches all topics.
Any,
/// Matches only statements including all of the given topics.
/// Bytes are expected to be a 32-byte topic. Up to `4` topics can be provided.
MatchAll(HashSet<Topic>),
/// Matches statements including any of the given topics.
/// Bytes are expected to be a 32-byte topic. Up to `128` topics can be provided.
MatchAny(HashSet<Topic>),
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we directly use BoundedVec<Topic> above? Then we don't need to duplicate this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pre-processing into a HashSet is still needed for the match function, so even with using BoundedVec, we would still need a different structure, I would just save the length check.

But, I'm a bit reluctant to change the RPC API parameter at this point, since I know @ERussel is already working on using the new API.

-       MatchAll(Vec<Bytes>),
+       MatchAll(BoundedVec<Bytes, ConstU32<{ MAX_TOPICS as u32 }>>),

-       MatchAny(Vec<Bytes>),
+       MatchAny(BoundedVec<Bytes, ConstU32<{ MAX_ANY_TOPICS as u32 }>>)

@ERussel how much friction changes like this could create on your end ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any problems on the client side with this improvement

Copy link
Contributor Author

@alexggh alexggh Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, updated to use BoundedVec, I couldn't remove the CheckedTopicFilter, because it does some other validations as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexggh what other valdations does it do? I just see the check if topic is 32 bytes. But Topic being an array of 32 bytes, it should only accept 32 bytes at decoding and not less or more?

}

/// Returns decryption key if any.
#[allow(deprecated)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think anyone is using these methods and it can be directly removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this internally, since we are not sure about the future of this field, we aggreed to keep the field and implementation around.

fn subscribe_statement(
&self,
pending: PendingSubscriptionSink,
_ext: &Extensions,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that most of our other RPCs are already easy dosable. At some point we had some working on this, but I think this stopped.

@paritytech-workflow-stopper
Copy link

All GitHub workflows were cancelled due to failure one of the required jobs.
Failed workflow url: https://github.com/paritytech/polkadot-sdk/actions/runs/21486888278
Failed job name: fmt

Signed-off-by: Alexandru Gheorghe <[email protected]>
Signed-off-by: Alexandru Gheorghe <[email protected]>
@@ -0,0 +1,166 @@
use super::*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing license header.

Comment on lines +112 to +116
spawn_subscription_task(&self.executor, async {
PendingSubscription::from(pending)
.pipe_from_stream(subscription_stream, BoundedVecDeque::new(128))
.await;
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
spawn_subscription_task(&self.executor, async {
PendingSubscription::from(pending)
.pipe_from_stream(subscription_stream, BoundedVecDeque::new(128))
.await;
});
spawn_subscription_task(&self.executor,
PendingSubscription::from(pending)
.pipe_from_stream(subscription_stream, BoundedVecDeque::new(128))
);

Comment on lines +53 to +64
/// Topic filter for statement subscriptions.
#[derive(Clone, Debug)]
pub enum CheckedTopicFilter {
/// Matches all topics.
Any,
/// Matches only statements including all of the given topics.
/// Bytes are expected to be a 32-byte topic. Up to `4` topics can be provided.
MatchAll(HashSet<Topic>),
/// Matches statements including any of the given topics.
/// Bytes are expected to be a 32-byte topic. Up to `128` topics can be provided.
MatchAny(HashSet<Topic>),
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexggh what other valdations does it do? I just see the check if topic is 32 bytes. But Topic being an array of 32 bytes, it should only accept 32 bytes at decoding and not less or more?

mut f: impl FnMut(&Hash) -> Result<()>,
) -> Result<()> {
let key_set = self.by_dec_key.get(&key);
if key_set.map_or(0, |s| s.len()) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if key_set.map_or(0, |s| s.len()) == 0 {
if key_set.map_or(true, |s| s.is_empty()) {

Comment on lines +315 to +319
let key_set = self.by_dec_key.get(&key);
if key_set.map_or(0, |s| s.len()) == 0 {
// Key does not exist in the index.
return Ok(())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let key_set = self.by_dec_key.get(&key);
if key_set.map_or(0, |s| s.len()) == 0 {
// Key does not exist in the index.
return Ok(())
}
let Some(key_set) = self.by_dec_key.get(&key).filter(|k| !k.is_empty()) else { return; };

Comment on lines +92 to +103
for _ in 0..num_matcher_workers {
let (subscription_matcher_sender, subscription_matcher_receiver) =
async_channel::bounded(MATCHERS_TASK_CHANNEL_BUFFER_SIZE);
subscriptions_matchers_senders.push(subscription_matcher_sender);
task_spawner.spawn_blocking(
"statement-store-subscription-filters",
Some("statement-store"),
Box::pin(async move {
let mut subscriptions = SubscriptionsInfo::new();
log::debug!(
target: LOG_TARGET,
"Started statement subscription matcher task"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for _ in 0..num_matcher_workers {
let (subscription_matcher_sender, subscription_matcher_receiver) =
async_channel::bounded(MATCHERS_TASK_CHANNEL_BUFFER_SIZE);
subscriptions_matchers_senders.push(subscription_matcher_sender);
task_spawner.spawn_blocking(
"statement-store-subscription-filters",
Some("statement-store"),
Box::pin(async move {
let mut subscriptions = SubscriptionsInfo::new();
log::debug!(
target: LOG_TARGET,
"Started statement subscription matcher task"
for task in 0..num_matcher_workers {
let (subscription_matcher_sender, subscription_matcher_receiver) =
async_channel::bounded(MATCHERS_TASK_CHANNEL_BUFFER_SIZE);
subscriptions_matchers_senders.push(subscription_matcher_sender);
task_spawner.spawn_blocking(
"statement-store-subscription-filters",
Some("statement-store"),
Box::pin(async move {
let mut subscriptions = SubscriptionsInfo::new();
log::debug!(
target: LOG_TARGET,
"Started statement subscription matcher task: {task}"

// Expected when the subscription manager is dropped at shutdown.
log::error!(
target: LOG_TARGET,
"Statement subscription matcher channel closed"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Statement subscription matcher channel closed"
"Statement subscription matcher channel closed: {task}"

Comment on lines +220 to +233
.insert(subscription_info.seq_id, subscription_info.clone());
},
CheckedTopicFilter::MatchAll(topics) =>
for topic in topics {
self.subscriptions_match_all_by_topic.entry(*topic).or_default()
[topics.len() - 1]
.insert(subscription_info.seq_id, subscription_info.clone());
},
CheckedTopicFilter::MatchAny(topics) =>
for topic in topics {
self.subscriptions_match_any_by_topic
.entry(*topic)
.or_default()
.insert(subscription_info.seq_id, subscription_info.clone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.insert(subscription_info.seq_id, subscription_info.clone());
},
CheckedTopicFilter::MatchAll(topics) =>
for topic in topics {
self.subscriptions_match_all_by_topic.entry(*topic).or_default()
[topics.len() - 1]
.insert(subscription_info.seq_id, subscription_info.clone());
},
CheckedTopicFilter::MatchAny(topics) =>
for topic in topics {
self.subscriptions_match_any_by_topic
.entry(*topic)
.or_default()
.insert(subscription_info.seq_id, subscription_info.clone());
.insert(subscription_info.seq_id, subscription_info);
},
CheckedTopicFilter::MatchAll(topics) =>
for topic in topics {
self.subscriptions_match_all_by_topic.entry(*topic).or_default()
[topics.len() - 1]
.insert(subscription_info.seq_id, subscription_info);
},
CheckedTopicFilter::MatchAny(topics) =>
for topic in topics {
self.subscriptions_match_any_by_topic
.entry(*topic)
.or_default()
.insert(subscription_info.seq_id, subscription_info);

// Check all combinations of topics in the statement to find matching subscriptions.
// This works well because the maximum allowed topics is small (MAX_TOPICS = 4).
for num_topics_to_check in 1..=num_topics {
for topics_combination in statement.topics().iter().combinations(num_topics_to_check) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the combinations? We are any way iterating all the topics? Could we not just use statement.topics() below to fin the topic with the fewest?

) -> std::task::Poll<Option<Self::Item>> {
self.rx.poll_next_unpin(cx)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

T0-node This PR/Issue is related to the topic “node”.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants