Skip to content

Commit 9e5b426

Browse files
fix(taiko-client-rs): make Shasta event sync resilient on genesis and pre-finality startup (#21348)
Co-authored-by: Maciej Skrzypkowski <mskr@gmx.com>
1 parent 087a04c commit 9e5b426

File tree

9 files changed

+236
-138
lines changed

9 files changed

+236
-138
lines changed

packages/taiko-client-rs/crates/driver/src/sync/engine.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,12 @@ fn envelope_into_submission(
275275
) -> (ExecutionPayloadInputV2, B256, u64) {
276276
match envelope.execution_payload {
277277
ExecutionPayloadFieldV2::V1(payload) => (
278-
ExecutionPayloadInputV2 { execution_payload: payload.clone(), withdrawals: None },
278+
// Taiko chains are always post-Shanghai so withdrawals must be non-nil even
279+
// when the engine returns a V1 envelope (which omits the withdrawals field).
280+
ExecutionPayloadInputV2 {
281+
execution_payload: payload.clone(),
282+
withdrawals: Some(Vec::new()),
283+
},
279284
payload.block_hash,
280285
payload.block_number,
281286
),

packages/taiko-client-rs/crates/driver/src/sync/error.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,6 @@ pub enum SyncError {
5656
number: u64,
5757
},
5858

59-
/// Event sync: finalized L1 block is unavailable; resume must fail closed.
60-
#[error("finalized l1 block is unavailable")]
61-
MissingFinalizedL1Block,
62-
6359
/// Event sync: execution engine missing batch-to-block mapping.
6460
#[error("no execution block found for batch {proposal_id}")]
6561
MissingExecutionBlockForBatch {

packages/taiko-client-rs/crates/driver/src/sync/event.rs

Lines changed: 134 additions & 77 deletions
Large diffs are not rendered by default.

packages/taiko-client-rs/crates/rpc/src/auth.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,15 @@ impl<P: Provider + Clone> Client<P> {
201201
let mut payload_value = serde_json::to_value(&payload.execution_payload)
202202
.map_err(|err| RpcClientError::Other(anyhow!(err)))?;
203203
if let serde_json::Value::Object(ref mut obj) = payload_value {
204+
// Include the withdrawals list so taiko-geth can reconstruct the full block.
205+
// The Go driver sends the full ExecutableData (with withdrawals); omitting
206+
// this field causes a blockhash mismatch because geth cannot recompute the
207+
// withdrawals root from the hash alone.
208+
if let Some(ref withdrawals) = payload.withdrawals {
209+
let withdrawals_value = serde_json::to_value(withdrawals)
210+
.map_err(|err| RpcClientError::Other(anyhow!(err)))?;
211+
obj.insert("withdrawals".to_string(), withdrawals_value);
212+
}
204213
obj.insert(
205214
"txHash".to_string(),
206215
serde_json::Value::String(format!("{:#066x}", sidecar.tx_hash)),
@@ -224,6 +233,17 @@ impl<P: Provider + Clone> Client<P> {
224233
forkchoice_state: ForkchoiceState,
225234
payload_attributes: Option<TaikoPayloadAttributes>,
226235
) -> Result<ForkchoiceUpdated> {
236+
let forkchoice_state = serde_json::to_value(forkchoice_state)
237+
.map_err(|err| RpcClientError::Other(anyhow!(err)))?;
238+
239+
let payload_attributes = match payload_attributes {
240+
Some(payload_attributes) => Some(
241+
serde_json::to_value(&payload_attributes)
242+
.map_err(|err| RpcClientError::Other(anyhow!(err)))?,
243+
),
244+
None => None,
245+
};
246+
227247
self.l2_auth_provider
228248
.raw_request(
229249
Cow::Borrowed(TaikoEngineMethod::ForkchoiceUpdatedV2.as_str()),

packages/taiko-client-rs/crates/whitelist-preconfirmation-driver/src/importer/cache_import.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,11 @@ where
213213
"inserted whitelist preconfirmation block"
214214
);
215215

216+
if let Some(ref highest) = self.highest_unsafe_l2_payload_block_id {
217+
let mut guard = highest.lock().await;
218+
*guard = block_number.max(*guard);
219+
}
220+
216221
Ok(true)
217222
}
218223
}

packages/taiko-client-rs/crates/whitelist-preconfirmation-driver/src/importer/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use alloy_provider::Provider;
88
use bindings::preconf_whitelist::PreconfWhitelist::PreconfWhitelistInstance;
99
use driver::sync::event::EventSyncer;
1010
use rpc::client::Client;
11-
use tokio::sync::mpsc;
11+
use tokio::sync::{Mutex, mpsc};
1212
use tracing::{debug, info, warn};
1313

1414
use crate::{
@@ -68,6 +68,8 @@ where
6868
sequencer_cache: WhitelistSequencerCache,
6969
/// Command channel used to publish P2P requests/responses.
7070
network_command_tx: mpsc::Sender<NetworkCommand>,
71+
/// Shared highest unsafe L2 payload block ID (updated on P2P import when REST server enabled).
72+
highest_unsafe_l2_payload_block_id: Option<Arc<Mutex<u64>>>,
7173
/// Latched flag indicating event sync has exposed a head L1 origin.
7274
sync_ready: bool,
7375
/// Shasta anchor contract address used to validate the first transaction.
@@ -85,6 +87,7 @@ where
8587
whitelist_address: Address,
8688
chain_id: u64,
8789
network_command_tx: mpsc::Sender<NetworkCommand>,
90+
highest_unsafe_l2_payload_block_id: Option<Arc<Mutex<u64>>>,
8891
) -> Self {
8992
let whitelist = PreconfWhitelistInstance::new(whitelist_address, rpc.l1_provider.clone());
9093
let anchor_address = *rpc.shasta.anchor.address();
@@ -99,6 +102,7 @@ where
99102
request_throttle: RequestThrottle::default(),
100103
sequencer_cache: WhitelistSequencerCache::default(),
101104
network_command_tx,
105+
highest_unsafe_l2_payload_block_id,
102106
sync_ready: false,
103107
anchor_address,
104108
};

packages/taiko-client-rs/crates/whitelist-preconfirmation-driver/src/rest/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ pub struct WhitelistStatus {
159159
pub highest_unsafe_block_number: u64,
160160
/// Local libp2p peer ID.
161161
pub peer_id: String,
162-
/// Whether event sync has established a head L1 origin.
162+
/// Whether preconfirmation ingress is ready.
163163
pub sync_ready: bool,
164164
/// Sequencing lookahead information.
165165
#[serde(skip_serializing_if = "Option::is_none")]

packages/taiko-client-rs/crates/whitelist-preconfirmation-driver/src/rest_handler.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ where
7373
build_preconf_lock: Mutex<()>,
7474
/// Preconf whitelist contract used for operator checks.
7575
whitelist: PreconfWhitelistInstance<P>,
76-
/// Highest unsafe payload block ID tracked by this node.
77-
highest_unsafe_l2_payload_block_id: Mutex<u64>,
76+
/// Highest unsafe payload block ID tracked by this node (shared with importer).
77+
highest_unsafe_l2_payload_block_id: Arc<Mutex<u64>>,
7878
/// End-of-sequencing hash cache keyed by epoch.
7979
end_of_sequencing_by_epoch: Mutex<HashMap<u64, B256>>,
8080
/// Broadcast channel for REST `/ws` end-of-sequencing notifications.
@@ -98,8 +98,8 @@ where
9898
pub beacon_client: Arc<BeaconClient>,
9999
/// Whitelist contract address used for operator checks.
100100
pub whitelist_address: Address,
101-
/// Highest unsafe payload block ID tracked by this node at startup.
102-
pub initial_highest_unsafe_l2_payload_block_id: u64,
101+
/// Shared highest unsafe payload block ID (also updated by importer on P2P import).
102+
pub highest_unsafe_l2_payload_block_id: Arc<Mutex<u64>>,
103103
/// Channel used to publish messages to the P2P network.
104104
pub network_command_tx: mpsc::Sender<NetworkCommand>,
105105
/// Local peer ID string.
@@ -119,7 +119,7 @@ where
119119
signer,
120120
beacon_client,
121121
whitelist_address,
122-
initial_highest_unsafe_l2_payload_block_id,
122+
highest_unsafe_l2_payload_block_id,
123123
network_command_tx,
124124
local_peer_id,
125125
} = params;
@@ -132,9 +132,7 @@ where
132132
signer,
133133
beacon_client,
134134
whitelist,
135-
highest_unsafe_l2_payload_block_id: Mutex::new(
136-
initial_highest_unsafe_l2_payload_block_id,
137-
),
135+
highest_unsafe_l2_payload_block_id,
138136
end_of_sequencing_by_epoch: Mutex::new(HashMap::new()),
139137
eos_notification_tx,
140138
network_command_tx,
@@ -375,8 +373,14 @@ where
375373
let started_at = Instant::now();
376374
let _build_guard = self.build_preconf_lock.lock().await;
377375

376+
// Guard against building on a genuinely syncing node, but tolerate the false-
377+
// positive that taiko-geth emits on genesis chains (currentBlock == highestBlock
378+
// == 0, txIndexRemainingBlocks = 1). When current == highest the node is not
379+
// actually catching up to a remote peer, so we allow the build to proceed.
378380
let sync_status = self.rpc.l2_provider.syncing().await.map_err(provider_err)?;
379-
if matches!(sync_status, SyncStatus::Info(_)) {
381+
if let SyncStatus::Info(ref info) = sync_status &&
382+
info.current_block < info.highest_block
383+
{
380384
return Err(WhitelistPreconfirmationDriverError::Driver(
381385
driver::DriverError::EngineSyncing(request.block_number),
382386
));
@@ -520,7 +524,9 @@ where
520524
.get(&current_epoch)
521525
.copied()
522526
.map(|hash| hash.to_string());
523-
let sync_ready = head_l1_origin_block_id.is_some();
527+
// sync_ready reflects ingress readiness, which already includes the confirmed-sync
528+
// and scanner-live checks required by the event syncer.
529+
let sync_ready = self.event_syncer.is_preconf_ingress_ready();
524530

525531
Ok(WhitelistStatus {
526532
head_l1_origin_block_id,

packages/taiko-client-rs/crates/whitelist-preconfirmation-driver/src/runner.rs

Lines changed: 49 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use driver::{DriverConfig, map_driver_error};
1414
use preconfirmation_net::P2pConfig;
1515
use protocol::signer::FixedKSigner;
1616
use rpc::beacon::BeaconClient;
17-
use tokio::time;
17+
use tokio::{sync::Mutex, time};
1818
use tracing::{info, warn};
1919

2020
use crate::{
@@ -227,62 +227,67 @@ impl WhitelistPreconfirmationDriverRunner {
227227
);
228228

229229
// Optionally start the REST/WS server when both rpc_listen_addr and p2p_signer_key
230-
// are configured.
231-
let mut rest_ws_server = if let (Some(listen_addr), Some(signer_key)) =
232-
(self.config.rpc_listen_addr, &self.config.p2p_signer_key)
233-
{
234-
let beacon_client = Arc::new(
235-
BeaconClient::new(self.config.driver_config.l1_beacon_endpoint.clone())
236-
.await
237-
.map_err(|err| WhitelistPreconfirmationDriverError::RestWsServerBeaconInit {
230+
// are configured. When enabled, create shared state for highestUnsafeL2PayloadBlockID
231+
// so the importer can update it on P2P imports.
232+
let (mut rest_ws_server, shared_highest_unsafe) =
233+
if let (Some(listen_addr), Some(signer_key)) =
234+
(self.config.rpc_listen_addr, &self.config.p2p_signer_key)
235+
{
236+
let beacon_client = Arc::new(
237+
BeaconClient::new(self.config.driver_config.l1_beacon_endpoint.clone())
238+
.await
239+
.map_err(|err| WhitelistPreconfirmationDriverError::RestWsServerBeaconInit {
238240
reason: err.to_string(),
239241
})?,
240-
);
242+
);
241243

242-
let signer = FixedKSigner::new(signer_key).map_err(|e| {
243-
WhitelistPreconfirmationDriverError::Signing(format!(
244-
"failed to create P2P signer: {e}"
245-
))
246-
})?;
247-
let initial_highest_unsafe_l2_payload_block_id =
248-
initial_highest_unsafe_l2_payload_block_id(&preconf_ingress_sync).await;
244+
let signer = FixedKSigner::new(signer_key).map_err(|e| {
245+
WhitelistPreconfirmationDriverError::Signing(format!(
246+
"failed to create P2P signer: {e}"
247+
))
248+
})?;
249+
let initial_highest_unsafe_l2_payload_block_id =
250+
initial_highest_unsafe_l2_payload_block_id(&preconf_ingress_sync).await;
251+
let shared_highest =
252+
Arc::new(Mutex::new(initial_highest_unsafe_l2_payload_block_id));
249253

250-
let handler = WhitelistRestHandler::new(WhitelistRestHandlerParams {
251-
event_syncer: preconf_ingress_sync.event_syncer(),
252-
rpc: preconf_ingress_sync.client().clone(),
253-
chain_id: self.config.p2p_config.chain_id,
254-
signer,
255-
beacon_client,
256-
whitelist_address: self.config.whitelist_address,
257-
initial_highest_unsafe_l2_payload_block_id,
258-
network_command_tx: network.command_tx.clone(),
259-
local_peer_id: network.local_peer_id.to_string(),
260-
});
254+
let handler = WhitelistRestHandler::new(WhitelistRestHandlerParams {
255+
event_syncer: preconf_ingress_sync.event_syncer(),
256+
rpc: preconf_ingress_sync.client().clone(),
257+
chain_id: self.config.p2p_config.chain_id,
258+
signer,
259+
beacon_client,
260+
whitelist_address: self.config.whitelist_address,
261+
highest_unsafe_l2_payload_block_id: shared_highest.clone(),
262+
network_command_tx: network.command_tx.clone(),
263+
local_peer_id: network.local_peer_id.to_string(),
264+
});
261265

262-
let server_config = WhitelistRestWsServerConfig {
263-
listen_addr,
264-
jwt_secret: self.config.rpc_jwt_secret.clone(),
265-
cors_origins: self.config.rpc_cors_origins.clone(),
266-
..Default::default()
266+
let server_config = WhitelistRestWsServerConfig {
267+
listen_addr,
268+
jwt_secret: self.config.rpc_jwt_secret.clone(),
269+
cors_origins: self.config.rpc_cors_origins.clone(),
270+
..Default::default()
271+
};
272+
let server = WhitelistRestWsServer::start(server_config, Arc::new(handler)).await?;
273+
info!(
274+
addr = %server.local_addr(),
275+
http_url = %server.http_url(),
276+
ws_url = %server.ws_url(),
277+
"whitelist preconfirmation REST server started"
278+
);
279+
(Some(server), Some(shared_highest))
280+
} else {
281+
(None, None)
267282
};
268-
let server = WhitelistRestWsServer::start(server_config, Arc::new(handler)).await?;
269-
info!(
270-
addr = %server.local_addr(),
271-
http_url = %server.http_url(),
272-
ws_url = %server.ws_url(),
273-
"whitelist preconfirmation REST server started"
274-
);
275-
Some(server)
276-
} else {
277-
None
278-
};
279283

280284
let mut importer = WhitelistPreconfirmationImporter::new(
281285
preconf_ingress_sync.event_syncer(),
282286
preconf_ingress_sync.client().clone(),
283287
self.config.whitelist_address,
284288
self.config.p2p_config.chain_id,
285289
network.command_tx.clone(),
290+
shared_highest_unsafe,
286291
);
287292
let mut epoch_tick = time::interval(Duration::from_secs(L1_EPOCH_DURATION_SECS));
288293

0 commit comments

Comments
 (0)