Skip to content

Commit bddc250

Browse files
dapplioneserilev
authored andcommitted
Unsubscribe blob topics at Fulu fork (sigp#6932)
Addresses sigp#6854. PeerDAS requires unsubscribing a Gossip topic at a fork boundary. This is not possible with our current topic machinery. Instead of defining which topics have to be **added** at a given fork, we define the complete set of topics at a given fork. The new start of the show and key function is: ```rust pub fn core_topics_to_subscribe<E: EthSpec>( fork_name: ForkName, opts: &TopicConfig, spec: &ChainSpec, ) -> Vec<GossipKind> { // ... if fork_name.deneb_enabled() && !fork_name.fulu_enabled() { // All of deneb blob topics are core topics for i in 0..spec.blob_sidecar_subnet_count(fork_name) { topics.push(GossipKind::BlobSidecar(i)); } } // ... } ``` `core_topics_to_subscribe` only returns the blob topics if `fork < Fulu`. Then at the fork boundary, we subscribe with the new fork digest to `core_topics_to_subscribe(next_fork)`, which excludes the blob topics. I added `is_fork_non_core_topic` to carry on to the next fork the aggregator topics for attestations and sync committee messages. This approach is future-proof if those topics ever become fork-dependent. Closes sigp#6854
1 parent e1d5114 commit bddc250

File tree

6 files changed

+231
-199
lines changed

6 files changed

+231
-199
lines changed

beacon_node/lighthouse_network/src/service/mod.rs

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@ use crate::rpc::{
1414
RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC,
1515
};
1616
use crate::types::{
17-
attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding,
18-
GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS,
19-
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
17+
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
18+
GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery,
2019
};
2120
use crate::EnrExt;
2221
use crate::Eth2Enr;
@@ -280,14 +279,39 @@ impl<E: EthSpec> Network<E> {
280279
// Set up a scoring update interval
281280
let update_gossipsub_scores = tokio::time::interval(params.decay_interval);
282281

283-
let max_topics = ctx.chain_spec.attestation_subnet_count as usize
284-
+ SYNC_COMMITTEE_SUBNET_COUNT as usize
285-
+ ctx.chain_spec.blob_sidecar_subnet_count_max() as usize
286-
+ ctx.chain_spec.data_column_sidecar_subnet_count as usize
287-
+ BASE_CORE_TOPICS.len()
288-
+ ALTAIR_CORE_TOPICS.len()
289-
+ CAPELLA_CORE_TOPICS.len() // 0 core deneb and electra topics
290-
+ LIGHT_CLIENT_GOSSIP_TOPICS.len();
282+
let current_and_future_forks = ForkName::list_all().into_iter().filter_map(|fork| {
283+
if fork >= ctx.fork_context.current_fork() {
284+
ctx.fork_context
285+
.to_context_bytes(fork)
286+
.map(|fork_digest| (fork, fork_digest))
287+
} else {
288+
None
289+
}
290+
});
291+
292+
let all_topics_for_forks = current_and_future_forks
293+
.map(|(fork, fork_digest)| {
294+
all_topics_at_fork::<E>(fork, &ctx.chain_spec)
295+
.into_iter()
296+
.map(|topic| {
297+
Topic::new(GossipTopic::new(
298+
topic,
299+
GossipEncoding::default(),
300+
fork_digest,
301+
))
302+
.into()
303+
})
304+
.collect::<Vec<TopicHash>>()
305+
})
306+
.collect::<Vec<_>>();
307+
308+
// For simplicity find the fork with the most individual topics and assume all forks
309+
// have the same topic count
310+
let max_topics_at_any_fork = all_topics_for_forks
311+
.iter()
312+
.map(|topics| topics.len())
313+
.max()
314+
.expect("each fork has at least 5 hardcoded core topics");
291315

292316
let possible_fork_digests = ctx.fork_context.all_fork_digests();
293317
let filter = gossipsub::MaxCountSubscriptionFilter {
@@ -297,9 +321,9 @@ impl<E: EthSpec> Network<E> {
297321
SYNC_COMMITTEE_SUBNET_COUNT,
298322
),
299323
// during a fork we subscribe to both the old and new topics
300-
max_subscribed_topics: max_topics * 4,
324+
max_subscribed_topics: max_topics_at_any_fork * 4,
301325
// 424 in theory = (64 attestation + 4 sync committee + 7 core topics + 9 blob topics + 128 column topics) * 2
302-
max_subscriptions_per_request: max_topics * 2,
326+
max_subscriptions_per_request: max_topics_at_any_fork * 2,
303327
};
304328

305329
// If metrics are enabled for libp2p build the configuration
@@ -332,17 +356,9 @@ impl<E: EthSpec> Network<E> {
332356
// If we are using metrics, then register which topics we want to make sure to keep
333357
// track of
334358
if ctx.libp2p_registry.is_some() {
335-
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
336-
.map(|gossip_kind| {
337-
Topic::from(GossipTopic::new(
338-
gossip_kind,
339-
GossipEncoding::default(),
340-
enr_fork_id.fork_digest,
341-
))
342-
.into()
343-
})
344-
.collect::<Vec<TopicHash>>();
345-
gossipsub.register_topics_for_metrics(topics_to_keep_metrics_for);
359+
for topics in all_topics_for_forks {
360+
gossipsub.register_topics_for_metrics(topics);
361+
}
346362
}
347363

348364
(gossipsub, update_gossipsub_scores)
@@ -700,38 +716,26 @@ impl<E: EthSpec> Network<E> {
700716

701717
/// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`.
702718
pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) {
703-
// Subscribe to existing topics with new fork digest
719+
// Re-subscribe to non-core topics with the new fork digest
704720
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
705721
for mut topic in subscriptions.into_iter() {
706-
topic.fork_digest = new_fork_digest;
707-
self.subscribe(topic);
722+
if is_fork_non_core_topic(&topic, new_fork) {
723+
topic.fork_digest = new_fork_digest;
724+
self.subscribe(topic);
725+
}
708726
}
709727

710728
// Subscribe to core topics for the new fork
711-
for kind in fork_core_topics::<E>(
712-
&new_fork,
713-
&self.fork_context.spec,
729+
for kind in core_topics_to_subscribe::<E>(
730+
new_fork,
714731
&self.network_globals.as_topic_config(),
732+
&self.fork_context.spec,
715733
) {
716734
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
717735
self.subscribe(topic);
718736
}
719737

720-
// TODO(das): unsubscribe from blob topics at the Fulu fork
721-
722-
// Register the new topics for metrics
723-
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
724-
.map(|gossip_kind| {
725-
Topic::from(GossipTopic::new(
726-
gossip_kind,
727-
GossipEncoding::default(),
728-
new_fork_digest,
729-
))
730-
.into()
731-
})
732-
.collect::<Vec<TopicHash>>();
733-
self.gossipsub_mut()
734-
.register_topics_for_metrics(topics_to_keep_metrics_for);
738+
// Already registered all possible gossipsub topics for metrics
735739
}
736740

737741
/// Unsubscribe from all topics that doesn't have the given fork_digest

beacon_node/lighthouse_network/src/types/globals.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ impl<E: EthSpec> NetworkGlobals<E> {
187187
/// Returns the TopicConfig to compute the set of Gossip topics for a given fork
188188
pub fn as_topic_config(&self) -> TopicConfig {
189189
TopicConfig {
190+
enable_light_client_server: self.config.enable_light_client_server,
191+
subscribe_all_subnets: self.config.subscribe_all_subnets,
190192
subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets,
191193
sampling_subnets: &self.sampling_subnets,
192194
}

beacon_node/lighthouse_network/src/types/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ pub use pubsub::{PubsubMessage, SnappyTransform};
1616
pub use subnet::{Subnet, SubnetDiscovery};
1717
pub use sync_state::{BackFillState, SyncState};
1818
pub use topics::{
19-
attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics,
20-
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, TopicConfig,
21-
ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
19+
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
20+
GossipEncoding, GossipKind, GossipTopic, TopicConfig,
2221
};

0 commit comments

Comments
 (0)