Skip to content

Commit 8612d5f

Browse files
committed
fix(driver): use trusted resume head for event sync (checkpoint head or head_l1_origin)
1 parent 16780da commit 8612d5f

File tree

5 files changed

+133
-32
lines changed

5 files changed

+133
-32
lines changed

.github/workflows/taiko-client-rs--docker.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ permissions:
44

55
on:
66
push:
7-
branches: [main]
7+
branches: [main, event-resume]
88
tags:
99
- "taiko-alethia-client-rs-v*"
1010
paths:

packages/taiko-client-rs/crates/preconfirmation-driver/src/runner/mod.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ impl PreconfirmationDriverRunner {
203203
node_handle.abort();
204204
match result {
205205
Ok(Ok(())) => Err(RunnerError::EventSyncerExited),
206-
Ok(Err(err)) => Err(RunnerError::Sync(err)),
206+
Ok(Err(err)) => Err(map_driver_task_error(err)),
207207
Err(err) => Err(RunnerError::EventSyncerFailed(err.to_string())),
208208
}
209209
}
@@ -218,3 +218,11 @@ impl PreconfirmationDriverRunner {
218218
run_result
219219
}
220220
}
221+
222+
/// Map a driver task error to a runner error, preserving sync errors but wrapping other driver errors.
223+
fn map_driver_task_error(err: driver::DriverError) -> RunnerError {
224+
match err {
225+
driver::DriverError::Sync(sync_err) => RunnerError::Sync(sync_err),
226+
other => RunnerError::Driver(other),
227+
}
228+
}

packages/taiko-client-rs/crates/preconfirmation-driver/src/runner/preconf_ingress_sync.rs

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ use std::{future::Future, sync::Arc};
55
use alloy_provider::{
66
Provider, RootProvider, fillers::FillProvider, utils::JoinedRecommendedFillers,
77
};
8-
use driver::{
9-
DriverConfig, SyncStage,
10-
sync::{SyncError, event::EventSyncer},
11-
};
8+
use driver::{DriverConfig, SyncPipeline, sync::event::EventSyncer};
129
use rpc::client::Client;
1310
use tokio::task::JoinHandle;
1411

@@ -21,16 +18,16 @@ where
2118
{
2219
client: Client<P>,
2320
event_syncer: Arc<EventSyncer<P>>,
24-
handle: JoinHandle<std::result::Result<(), SyncError>>,
21+
handle: JoinHandle<std::result::Result<(), driver::DriverError>>,
2522
}
2623

2724
impl PreconfIngressSync<FillProvider<JoinedRecommendedFillers, RootProvider>> {
28-
/// Start the preconfirmation ingress syncer and its background task.
25+
/// Start the preconfirmation ingress sync pipeline and its background task.
2926
pub(crate) async fn start(config: &DriverConfig) -> Result<Self, RunnerError> {
3027
let client = Client::new(config.client.clone()).await?;
31-
let event_syncer = Arc::new(EventSyncer::new(config, client.clone()).await?);
32-
let event_syncer_task = event_syncer.clone();
33-
let handle = tokio::spawn(async move { event_syncer_task.run().await });
28+
let pipeline = SyncPipeline::new(config.clone(), client.clone()).await?;
29+
let event_syncer = pipeline.event_syncer();
30+
let handle = tokio::spawn(async move { pipeline.run().await });
3431

3532
Ok(Self { client, event_syncer, handle })
3633
}
@@ -50,8 +47,10 @@ where
5047
self.event_syncer.clone()
5148
}
5249

53-
/// Get a mutable handle to the ingress syncer task.
54-
pub(crate) fn handle_mut(&mut self) -> &mut JoinHandle<std::result::Result<(), SyncError>> {
50+
/// Get a mutable handle to the ingress sync pipeline task.
51+
pub(crate) fn handle_mut(
52+
&mut self,
53+
) -> &mut JoinHandle<std::result::Result<(), driver::DriverError>> {
5554
&mut self.handle
5655
}
5756

@@ -65,10 +64,18 @@ where
6564
}
6665
}
6766

67+
/// Wait for the preconfirmation ingress sync to signal readiness or fail.
68+
fn map_driver_error(err: driver::DriverError) -> RunnerError {
69+
match err {
70+
driver::DriverError::Sync(sync_err) => RunnerError::Sync(sync_err),
71+
other => RunnerError::Driver(other),
72+
}
73+
}
74+
6875
/// Wait for preconfirmation ingress to be ready or the ingress syncer to exit.
6976
pub(crate) async fn wait_for_preconf_ingress_ready<F>(
7077
ready: F,
71-
event_syncer_handle: &mut JoinHandle<std::result::Result<(), SyncError>>,
78+
event_syncer_handle: &mut JoinHandle<std::result::Result<(), driver::DriverError>>,
7279
) -> Result<(), RunnerError>
7380
where
7481
F: Future<Output = Option<()>> + Send,
@@ -78,7 +85,7 @@ where
7885
result = event_syncer_handle => {
7986
match result {
8087
Ok(Ok(())) => Err(RunnerError::EventSyncerExited),
81-
Ok(Err(err)) => Err(RunnerError::Sync(err)),
88+
Ok(Err(err)) => Err(map_driver_error(err)),
8289
Err(err) => Err(RunnerError::EventSyncerFailed(err.to_string())),
8390
}
8491
}
@@ -87,15 +94,38 @@ where
8794

8895
#[cfg(test)]
8996
mod tests {
97+
use driver::{DriverError, sync::SyncError};
98+
9099
#[tokio::test]
91100
async fn wait_for_preconf_ingress_ready_errors_when_ready_none() {
92-
use driver::sync::SyncError;
93-
94101
let ready = async { None };
95-
let mut handle = tokio::spawn(async { Ok::<(), SyncError>(()) });
102+
let mut handle = tokio::spawn(async { Ok::<(), DriverError>(()) });
96103

97104
let err = super::wait_for_preconf_ingress_ready(ready, &mut handle).await.unwrap_err();
98105

99106
assert!(err.to_string().contains("preconfirmation ingress"));
100107
}
108+
109+
#[tokio::test]
110+
async fn wait_for_preconf_ingress_ready_maps_sync_driver_error() {
111+
let ready = std::future::pending::<Option<()>>();
112+
let mut handle = tokio::spawn(async {
113+
Err::<(), DriverError>(DriverError::Sync(SyncError::MissingCheckpointResumeHead))
114+
});
115+
116+
let err = super::wait_for_preconf_ingress_ready(ready, &mut handle).await.unwrap_err();
117+
118+
assert!(matches!(err, super::RunnerError::Sync(SyncError::MissingCheckpointResumeHead)));
119+
}
120+
121+
#[tokio::test]
122+
async fn wait_for_preconf_ingress_ready_maps_non_sync_driver_error() {
123+
let ready = std::future::pending::<Option<()>>();
124+
let mut handle =
125+
tokio::spawn(async { Err::<(), DriverError>(DriverError::PreconfirmationDisabled) });
126+
127+
let err = super::wait_for_preconf_ingress_ready(ready, &mut handle).await.unwrap_err();
128+
129+
assert!(matches!(err, super::RunnerError::Driver(DriverError::PreconfirmationDisabled)));
130+
}
101131
}

packages/taiko-client-rs/crates/whitelist-preconfirmation-driver/src/preconf_ingress_sync.rs

Lines changed: 68 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ use std::{future::Future, sync::Arc};
55
use alloy_provider::{
66
Provider, RootProvider, fillers::FillProvider, utils::JoinedRecommendedFillers,
77
};
8-
use driver::{
9-
DriverConfig,
10-
sync::{SyncError, event::EventSyncer},
11-
};
8+
use driver::{DriverConfig, SyncPipeline, sync::event::EventSyncer};
129
use rpc::client::Client;
1310
use tokio::task::JoinHandle;
1411

@@ -23,17 +20,17 @@ where
2320
client: Client<P>,
2421
/// Shared event syncer that exposes ingress readiness and submit hooks.
2522
event_syncer: Arc<EventSyncer<P>>,
26-
/// Background task running the event sync loop.
27-
handle: JoinHandle<std::result::Result<(), SyncError>>,
23+
/// Background task running the sync pipeline loop.
24+
handle: JoinHandle<std::result::Result<(), driver::DriverError>>,
2825
}
2926

3027
impl PreconfIngressSync<FillProvider<JoinedRecommendedFillers, RootProvider>> {
31-
/// Start the event syncer and its background task.
28+
/// Start the sync pipeline and expose its event syncer and background task.
3229
pub(crate) async fn start(config: &DriverConfig) -> Result<Self> {
3330
let client = Client::new(config.client.clone()).await?;
34-
let event_syncer = Arc::new(EventSyncer::new(config, client.clone()).await?);
35-
let event_syncer_task = event_syncer.clone();
36-
let handle = tokio::spawn(async move { driver::SyncStage::run(&*event_syncer_task).await });
31+
let pipeline = SyncPipeline::new(config.clone(), client.clone()).await?;
32+
let event_syncer = pipeline.event_syncer();
33+
let handle = tokio::spawn(async move { pipeline.run().await });
3734

3835
Ok(Self { client, event_syncer, handle })
3936
}
@@ -54,7 +51,9 @@ where
5451
}
5552

5653
/// Access the background event syncer task handle.
57-
pub(crate) fn handle_mut(&mut self) -> &mut JoinHandle<std::result::Result<(), SyncError>> {
54+
pub(crate) fn handle_mut(
55+
&mut self,
56+
) -> &mut JoinHandle<std::result::Result<(), driver::DriverError>> {
5857
&mut self.handle
5958
}
6059

@@ -68,10 +67,18 @@ where
6867
}
6968
}
7069

70+
/// Map a driver error to a whitelist preconfirmation driver error, preserving sync errors but wrapping others.
71+
fn map_driver_error(err: driver::DriverError) -> WhitelistPreconfirmationDriverError {
72+
match err {
73+
driver::DriverError::Sync(sync_err) => WhitelistPreconfirmationDriverError::Sync(sync_err),
74+
other => WhitelistPreconfirmationDriverError::Driver(other),
75+
}
76+
}
77+
7178
/// Wait for ingress readiness, or return if the event syncer exits first.
7279
pub(crate) async fn wait_for_preconf_ingress_ready<F>(
7380
ready: F,
74-
event_syncer_handle: &mut JoinHandle<std::result::Result<(), SyncError>>,
81+
event_syncer_handle: &mut JoinHandle<std::result::Result<(), driver::DriverError>>,
7582
) -> Result<()>
7683
where
7784
F: Future<Output = Option<()>> + Send,
@@ -84,9 +91,57 @@ where
8491
result = event_syncer_handle => {
8592
match result {
8693
Ok(Ok(())) => Err(WhitelistPreconfirmationDriverError::EventSyncerExited),
87-
Ok(Err(err)) => Err(WhitelistPreconfirmationDriverError::Sync(err)),
94+
Ok(Err(err)) => Err(map_driver_error(err)),
8895
Err(err) => Err(WhitelistPreconfirmationDriverError::EventSyncerFailed(err.to_string())),
8996
}
9097
}
9198
}
9299
}
100+
101+
#[cfg(test)]
102+
mod tests {
103+
use driver::{DriverError, sync::SyncError};
104+
105+
#[tokio::test]
106+
async fn wait_for_preconf_ingress_ready_errors_when_ready_none() {
107+
let ready = async { None };
108+
let mut handle = tokio::spawn(async { Ok::<(), DriverError>(()) });
109+
110+
let err = super::wait_for_preconf_ingress_ready(ready, &mut handle).await.unwrap_err();
111+
assert!(matches!(
112+
err,
113+
super::WhitelistPreconfirmationDriverError::PreconfIngressNotEnabled
114+
));
115+
}
116+
117+
#[tokio::test]
118+
async fn wait_for_preconf_ingress_ready_maps_sync_driver_error() {
119+
let ready = std::future::pending::<Option<()>>();
120+
let mut handle = tokio::spawn(async {
121+
Err::<(), DriverError>(DriverError::Sync(SyncError::MissingCheckpointResumeHead))
122+
});
123+
124+
let err = super::wait_for_preconf_ingress_ready(ready, &mut handle).await.unwrap_err();
125+
assert!(matches!(
126+
err,
127+
super::WhitelistPreconfirmationDriverError::Sync(
128+
SyncError::MissingCheckpointResumeHead
129+
)
130+
));
131+
}
132+
133+
#[tokio::test]
134+
async fn wait_for_preconf_ingress_ready_maps_non_sync_driver_error() {
135+
let ready = std::future::pending::<Option<()>>();
136+
let mut handle =
137+
tokio::spawn(async { Err::<(), DriverError>(DriverError::PreconfirmationDisabled) });
138+
139+
let err = super::wait_for_preconf_ingress_ready(ready, &mut handle).await.unwrap_err();
140+
assert!(matches!(
141+
err,
142+
super::WhitelistPreconfirmationDriverError::Driver(
143+
DriverError::PreconfirmationDisabled
144+
)
145+
));
146+
}
147+
}

packages/taiko-client-rs/crates/whitelist-preconfirmation-driver/src/runner.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ impl WhitelistPreconfirmationDriverRunner {
235235
"reason" => "event_syncer_error",
236236
)
237237
.increment(1);
238-
Err(WhitelistPreconfirmationDriverError::Sync(err))
238+
Err(map_driver_task_error(err))
239239
}
240240
Err(err) => {
241241
metrics::counter!(
@@ -278,3 +278,11 @@ impl WhitelistPreconfirmationDriverRunner {
278278
}
279279
}
280280
}
281+
282+
/// Runs event sync plus whitelist preconfirmation message ingestion, with optional REST/WS server for external access.
283+
fn map_driver_task_error(err: driver::DriverError) -> WhitelistPreconfirmationDriverError {
284+
match err {
285+
driver::DriverError::Sync(sync_err) => WhitelistPreconfirmationDriverError::Sync(sync_err),
286+
other => WhitelistPreconfirmationDriverError::Driver(other),
287+
}
288+
}

0 commit comments

Comments
 (0)