-
Notifications
You must be signed in to change notification settings - Fork 213
feat(node): Kona Light CL #3245
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,4 +1,5 @@ | ||||||||||||||
| use alloy_rpc_types_engine::JwtSecret; | ||||||||||||||
| use kona_node_service::DerivationDelegateConfig; | ||||||||||||||
| use std::path::PathBuf; | ||||||||||||||
| use url::Url; | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -131,3 +132,18 @@ impl Default for L2ClientArgs { | |||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /// L2 derivation delegate connection arguments. | ||||||||||||||
| #[derive(Clone, Debug, Default, clap::Args)] | ||||||||||||||
| pub struct DerivationDelegateArgs { | ||||||||||||||
| /// The source must be an OP Stack L2 CL RPC exposing optimism_syncStatus. | ||||||||||||||
| #[arg(long, visible_alias = "l2.follow.source", env = "KONA_NODE_L2_FOLLOW_SOURCE")] | ||||||||||||||
| pub l2_follow_source: Option<Url>, | ||||||||||||||
|
Comment on lines
+139
to
+141
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make this any clearer to prevent misconfiguration? Maybe something like this?
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, would even love to add the URL to the design doc if possible
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason I chose the flag name l2.follow.source is to leave room for a small namespace around how the CL follows an external L2 source, e.g.:
This mirrors the existing naming in op-node, which already uses the l2.follow.* prefix, so I was trying to stay consistent with established conventions. Conceptually, the CL is "following" an external L2 derivation source, which is why follow felt like the right abstraction to me. That said, I understand the concern about misconfiguration and ambiguity, especially since this is an RPC URL and not just an abstract source. Open to tightening the name if we think the explicitness is worth the verbosity, and I agree that adding the concrete URL to the design doc would help reduce confusion regardless of the final flag name. |
||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| impl DerivationDelegateArgs { | ||||||||||||||
| /// Builds the derivation delegate configuration if an L2 CL URL was provided. | ||||||||||||||
| pub fn config(self) -> Option<DerivationDelegateConfig> { | ||||||||||||||
| self.l2_follow_source.map(|url| DerivationDelegateConfig { l2_cl_url: url }) | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -4,52 +4,167 @@ use crate::{ | |||
| ConsolidateTaskError, EngineClient, EngineState, EngineTaskExt, SynchronizeTask, | ||||
| state::EngineSyncStateUpdate, task_queue::build_and_seal, | ||||
| }; | ||||
| use alloy_rpc_types_eth::Block; | ||||
| use async_trait::async_trait; | ||||
| use derive_more::Constructor; | ||||
| use kona_genesis::RollupConfig; | ||||
| use kona_protocol::{L2BlockInfo, OpAttributesWithParent}; | ||||
| use op_alloy_rpc_types::Transaction; | ||||
| use std::{sync::Arc, time::Instant}; | ||||
|
|
||||
| /// Input for consolidation - either derived attributes or safe L2 block | ||||
| #[derive(Debug, Clone)] | ||||
| pub enum ConsolidateInput { | ||||
| /// Consolidate based on derived attributes. | ||||
| Attributes(Box<OpAttributesWithParent>), | ||||
| /// Derivation Delegation: consolidate based on safe L2 block info. | ||||
| BlockInfo(L2BlockInfo), | ||||
| } | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of making it an enum, I'd make two structures and have them implement the same trait. The parent struct (that holds the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I attempted this approach. So making the pub struct ConsolidateTask<EngineClient_: EngineClient> {
/// The engine client.
pub client: Arc<EngineClient_>,
/// The [`RollupConfig`].
pub cfg: Arc<RollupConfig>,
/// The input for consolidation (either attributes or block info).
pub input: ConsolidateInput,
}So this struct will receive another trait like pub struct ConsolidateTask<EngineClient_: EngineClient, ConsolidateInput_: ConsolidateInput> {
/// The engine client.
pub client: Arc<EngineClient_>,
/// The [`RollupConfig`].
pub cfg: Arc<RollupConfig>,
/// The input for consolidation (either attributes or block info).
pub input: ConsolidateInput_,
}The tasks are consumed at
pub fn enqueue(&mut self, task: EngineTask<EngineClient_>) {
self.tasks.push(task);
self.task_queue_length.send_replace(self.tasks.len());
}so if we add an additional trait, my understanding is that all other tasks must be aware of this type. Are there any clear methods to use the additional trait types in this case?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I suppose that this would entail that Let's leave it that way for now. I will revisit in the future and try to simplify this structure |
||||
|
|
||||
| impl From<L2BlockInfo> for ConsolidateInput { | ||||
| fn from(v: L2BlockInfo) -> Self { | ||||
| Self::BlockInfo(v) | ||||
| } | ||||
| } | ||||
|
|
||||
| impl From<OpAttributesWithParent> for ConsolidateInput { | ||||
| fn from(v: OpAttributesWithParent) -> Self { | ||||
| Self::Attributes(Box::new(v)) | ||||
| } | ||||
| } | ||||
|
|
||||
| impl ConsolidateInput { | ||||
| /// Returns the block number for this consolidation input. | ||||
| const fn l2_block_number(&self) -> u64 { | ||||
| match self { | ||||
| Self::Attributes(attributes) => attributes.block_number(), | ||||
| Self::BlockInfo(info) => info.block_info.number, | ||||
| } | ||||
| } | ||||
|
|
||||
| /// Checks if the block is consistent with this consolidation input. | ||||
| fn is_consistent_with_block(&self, cfg: &RollupConfig, block: &Block<Transaction>) -> bool { | ||||
| match self { | ||||
| Self::Attributes(attributes) => { | ||||
| crate::AttributesMatch::check(cfg, attributes, block).is_match() | ||||
| } | ||||
| Self::BlockInfo(info) => block.header.hash == info.block_info.hash, | ||||
| } | ||||
| } | ||||
|
|
||||
| /// Returns true if this is `Attributes` and `attributes.is_last_in_span` is true. | ||||
| const fn is_attributes_last_in_span(&self) -> bool { | ||||
| matches!( | ||||
| self, | ||||
| Self::Attributes(attributes) | ||||
| if attributes.is_last_in_span | ||||
| ) | ||||
| } | ||||
| } | ||||
|
|
||||
| /// The [`ConsolidateTask`] attempts to consolidate the engine state | ||||
| /// using the specified payload attributes and the oldest unsafe head. | ||||
| /// | ||||
| /// If consolidation fails, payload attributes processing is attempted using `build_and_seal`. | ||||
| #[derive(Debug, Clone, Constructor)] | ||||
| /// using the specified payload attributes or block info. | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct ConsolidateTask<EngineClient_: EngineClient> { | ||||
| /// The engine client. | ||||
| pub client: Arc<EngineClient_>, | ||||
| /// The [`RollupConfig`]. | ||||
| pub cfg: Arc<RollupConfig>, | ||||
| /// The [`OpAttributesWithParent`] to instruct the execution layer to build. | ||||
| pub attributes: OpAttributesWithParent, | ||||
| /// Whether or not the payload was derived, or created by the sequencer. | ||||
| pub is_attributes_derived: bool, | ||||
| /// The input for consolidation (either attributes or block info). | ||||
| pub input: ConsolidateInput, | ||||
| } | ||||
|
|
||||
| impl<EngineClient_: EngineClient> ConsolidateTask<EngineClient_> { | ||||
| /// This is used when the [`ConsolidateTask`] fails to consolidate the engine state. | ||||
| /// Creates a new [`ConsolidateTask`] with the specified input | ||||
| pub const fn new( | ||||
| client: Arc<EngineClient_>, | ||||
| cfg: Arc<RollupConfig>, | ||||
| input: ConsolidateInput, | ||||
| ) -> Self { | ||||
| Self { client, cfg, input } | ||||
| } | ||||
|
|
||||
| /// This is used when the [`ConsolidateTask`] fails to consolidate the engine state | ||||
| async fn execute_build_and_seal_tasks( | ||||
| &self, | ||||
| state: &mut EngineState, | ||||
| attributes: &OpAttributesWithParent, | ||||
| ) -> Result<(), ConsolidateTaskError> { | ||||
| build_and_seal( | ||||
| state, | ||||
| self.client.clone(), | ||||
| build_and_seal(state, self.client.clone(), self.cfg.clone(), attributes.clone(), true) | ||||
| .await?; | ||||
|
|
||||
| Ok(()) | ||||
| } | ||||
|
|
||||
| /// This provides symmetric fallback behavior to with build_and_seal. | ||||
| async fn reconcile_to_safe_head( | ||||
| &self, | ||||
| state: &mut EngineState, | ||||
| safe_l2: &L2BlockInfo, | ||||
| ) -> Result<(), ConsolidateTaskError> { | ||||
| warn!( | ||||
| target: "engine", | ||||
| safe_l2 = %safe_l2, | ||||
| "Apply safe head" | ||||
| ); | ||||
|
|
||||
| let fcu_start = Instant::now(); | ||||
|
|
||||
| // We intentionally set unsafe_head and cross_unsafe_head to safe_l2 to ensure the | ||||
| // engine observes a self-consistent head state. This is required to correctly handle | ||||
| // reorgs (where unsafe may be ahead on a non-canonical fork) and to trigger EL sync when | ||||
| // the local unsafe head lags behind the safe head. | ||||
| SynchronizeTask::new( | ||||
| Arc::clone(&self.client), | ||||
| self.cfg.clone(), | ||||
| self.attributes.clone(), | ||||
| self.is_attributes_derived, | ||||
| EngineSyncStateUpdate { | ||||
| unsafe_head: Some(*safe_l2), | ||||
| cross_unsafe_head: Some(*safe_l2), | ||||
| safe_head: Some(*safe_l2), | ||||
| local_safe_head: Some(*safe_l2), | ||||
| ..Default::default() | ||||
|
pcw109550 marked this conversation as resolved.
|
||||
| }, | ||||
| ) | ||||
| .await?; | ||||
| .execute(state) | ||||
| .await | ||||
| .map_err(|e| { | ||||
| warn!(target: "engine", ?e, "Apply safe head failed"); | ||||
| e | ||||
| })?; | ||||
|
|
||||
| let fcu_duration = fcu_start.elapsed(); | ||||
|
|
||||
| info!( | ||||
| target: "engine", | ||||
| hash = %safe_l2.block_info.hash, | ||||
| number = safe_l2.block_info.number, | ||||
| fcu_duration = ?fcu_duration, | ||||
| "Updated safe head via follow safe" | ||||
| ); | ||||
|
|
||||
| Ok(()) | ||||
| } | ||||
|
|
||||
| /// Handles the fallback case when the block doesn't match the input or does not exist. | ||||
| async fn reconcile_unsafe_to_safe( | ||||
| &self, | ||||
| state: &mut EngineState, | ||||
| ) -> Result<(), ConsolidateTaskError> { | ||||
| match &self.input { | ||||
| ConsolidateInput::Attributes(attributes) => { | ||||
| self.execute_build_and_seal_tasks(state, attributes).await | ||||
| } | ||||
| ConsolidateInput::BlockInfo(safe_l2) => { | ||||
| self.reconcile_to_safe_head(state, safe_l2).await | ||||
| } | ||||
| } | ||||
| } | ||||
|
|
||||
| /// Attempts consolidation on the engine state. | ||||
| pub async fn consolidate(&self, state: &mut EngineState) -> Result<(), ConsolidateTaskError> { | ||||
| let global_start = Instant::now(); | ||||
|
|
||||
| // Fetch the unsafe l2 block after the attributes parent. | ||||
| let block_num = self.attributes.block_number(); | ||||
| // Fetch the unsafe L2 block | ||||
| let block_num = self.input.l2_block_number(); | ||||
| let fetch_start = Instant::now(); | ||||
| let block = match self.client.l2_block_by_label(block_num.into()).await { | ||||
| Ok(Some(block)) => block, | ||||
|
|
@@ -63,24 +178,20 @@ impl<EngineClient_: EngineClient> ConsolidateTask<EngineClient_> { | |||
| } | ||||
| }; | ||||
| let block_fetch_duration = fetch_start.elapsed(); | ||||
|
|
||||
| // Attempt to consolidate the unsafe head. | ||||
| // If this is successful, the forkchoice change synchronizes. | ||||
| // Otherwise, the attributes need to be processed. | ||||
| let block_hash = block.header.hash; | ||||
| if crate::AttributesMatch::check(&self.cfg, &self.attributes, &block).is_match() { | ||||
|
|
||||
| if self.input.is_consistent_with_block(&self.cfg, &block) { | ||||
| trace!( | ||||
| target: "engine", | ||||
| attributes = ?self.attributes, | ||||
| input = ?self.input, | ||||
| block_hash = %block_hash, | ||||
| "Consolidating engine state", | ||||
| ); | ||||
|
|
||||
| match L2BlockInfo::from_block_and_genesis(&block.into_consensus(), &self.cfg.genesis) { | ||||
| // Only issue a forkchoice update if the attributes are the last in the span | ||||
| // batch. This is an optimization to avoid sending a FCU | ||||
| // call for every block in the span batch. | ||||
| Ok(block_info) if !self.attributes.is_last_in_span => { | ||||
| Ok(block_info) if !self.input.is_attributes_last_in_span() => { | ||||
| let total_duration = global_start.elapsed(); | ||||
|
|
||||
| // Apply a transient update to the safe head. | ||||
|
|
@@ -121,7 +232,6 @@ impl<EngineClient_: EngineClient> ConsolidateTask<EngineClient_> { | |||
| })?; | ||||
|
|
||||
| let fcu_duration = fcu_start.elapsed(); | ||||
|
|
||||
| let total_duration = global_start.elapsed(); | ||||
|
|
||||
| info!( | ||||
|
|
@@ -143,14 +253,15 @@ impl<EngineClient_: EngineClient> ConsolidateTask<EngineClient_> { | |||
| } | ||||
| } | ||||
|
|
||||
| // Otherwise, the attributes need to be processed. | ||||
| debug!( | ||||
| target: "engine", | ||||
| attributes = ?self.attributes, | ||||
| input = ?self.input, | ||||
| block_hash = %block_hash, | ||||
| "Attributes mismatch! Executing build task to initiate reorg", | ||||
| "ConsolidateInput mismatch! Initiating reorg", | ||||
| ); | ||||
| self.execute_build_and_seal_tasks(state).await | ||||
| // Handle mismatch case - called when consistency check fails | ||||
| // or when L2BlockInfo construction fails in Attributes branch | ||||
| self.reconcile_unsafe_to_safe(state).await | ||||
| } | ||||
| } | ||||
|
|
||||
|
|
@@ -160,14 +271,25 @@ impl<EngineClient_: EngineClient> EngineTaskExt for ConsolidateTask<EngineClient | |||
|
|
||||
| type Error = ConsolidateTaskError; | ||||
|
|
||||
| // Behavior depends on how the safe head is provided: | ||||
| // | ||||
| // - `Attributes`: The safe head is advanced through the normal derivation flow, where the | ||||
| // DerivationActor and EngineActor coordinate both safe and unsafe heads. In this case, we | ||||
| // consolidate as long as the unsafe head has not fallen behind. | ||||
| // | ||||
| // - `BlockInfo`: The safe head is injected externally by the DerivationActor while delegating | ||||
| // derivation, and is not coordinated with the EngineActor's safe/unsafe heads. If the | ||||
| // injected safe head is ahead of the EngineActor's unsafe head, we reconcile the unsafe chain | ||||
| // up to the safe head instead of consolidating. | ||||
| async fn execute(&self, state: &mut EngineState) -> Result<(), ConsolidateTaskError> { | ||||
| // Skip to building the payload attributes if consolidation is not needed. | ||||
| if state.sync_state.safe_head().block_info.number < | ||||
| state.sync_state.unsafe_head().block_info.number | ||||
| { | ||||
| let safe_head_number = match &self.input { | ||||
| ConsolidateInput::Attributes { .. } => state.sync_state.safe_head().block_info.number, | ||||
| ConsolidateInput::BlockInfo(safe_block_info) => safe_block_info.block_info.number, | ||||
| }; | ||||
| if safe_head_number < state.sync_state.unsafe_head().block_info.number { | ||||
| self.consolidate(state).await | ||||
| } else { | ||||
| self.execute_build_and_seal_tasks(state).await | ||||
| self.reconcile_unsafe_to_safe(state).await | ||||
| } | ||||
| } | ||||
| } | ||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we could remove the option inside
DerivationDelegateArgsand instead just wrap the whole thing inside anOptionso that it is really optional to use the derivation delegation client:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rely on the optionality of
DerivationDelegateConfiginstead ofDerivationDelegateArgs?I followed the existing pattern for the
L2ClientArgs.