Skip to content

Commit 4f302ce

Browse files
feat!(worker): make sure the asm worker returns only when the block has been fully processed (#15)
* feat!(asm-worker): make ASM worker into sync mode * chore: mention proper ticket in the todo
1 parent 051904e commit 4f302ce

File tree

5 files changed

+197
-149
lines changed

5 files changed

+197
-149
lines changed

crates/worker/src/errors.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,10 @@ pub enum WorkerError {
5555

5656
#[error("Manifest hash out of bound (max {max}, requested {index})")]
5757
ManifestIndexOutOfBound { index: u64, max: u64 },
58+
59+
#[error("ASM worker exited unexpectedly")]
60+
WorkerExited,
61+
62+
#[error("unexpected error: {0}")]
63+
Unexpected(String),
5864
}

crates/worker/src/handle.rs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
use async_trait::async_trait;
22
use strata_primitives::prelude::*;
3-
use strata_service::{CommandHandle, ServiceMonitor};
3+
use strata_service::{CommandHandle, ServiceError, ServiceMonitor};
44
use strata_state::BlockSubmitter;
5-
use tracing::warn;
65

7-
use crate::AsmWorkerStatus;
6+
use crate::{AsmWorkerStatus, WorkerError, message::AsmWorkerMessage};
87

98
/// Handle for interacting with the ASM worker service.
109
#[derive(Debug)]
1110
pub struct AsmWorkerHandle {
12-
command_handle: CommandHandle<L1BlockCommitment>,
11+
command_handle: CommandHandle<AsmWorkerMessage>,
1312
monitor: ServiceMonitor<AsmWorkerStatus>,
1413
}
1514

1615
impl AsmWorkerHandle {
1716
/// Create a new ASM worker handle from a service command handle.
1817
pub fn new(
19-
command_handle: CommandHandle<L1BlockCommitment>,
18+
command_handle: CommandHandle<AsmWorkerMessage>,
2019
monitor: ServiceMonitor<AsmWorkerStatus>,
2120
) -> Self {
2221
Self {
@@ -31,25 +30,45 @@ impl AsmWorkerHandle {
3130
pub fn monitor(&self) -> &ServiceMonitor<AsmWorkerStatus> {
3231
&self.monitor
3332
}
33+
34+
/// Returns the number of pending inputs that have not been processed yet.
35+
pub fn pending(&self) -> usize {
36+
self.command_handle.pending()
37+
}
3438
}
3539

3640
#[async_trait]
3741
impl BlockSubmitter for AsmWorkerHandle {
38-
/// Sends a new l1 block to the ASM service.
42+
/// Sends an L1 block to the ASM service and waits for processing to complete.
3943
fn submit_block(&self, block: L1BlockCommitment) -> anyhow::Result<()> {
40-
if self.command_handle.send_blocking(block).is_err() {
41-
warn!(%block, "ASM handle closed when submitting");
42-
}
43-
44-
Ok(())
44+
self.command_handle
45+
.send_and_wait_blocking(|completion| AsmWorkerMessage::SubmitBlock(block, completion))
46+
.map_err(convert_service_error)?
47+
.map_err(Into::into)
4548
}
4649

47-
/// Sends a new l1 block to the ASM service.
50+
/// Sends an L1 block to the ASM service and waits for processing to complete.
4851
async fn submit_block_async(&self, block: L1BlockCommitment) -> anyhow::Result<()> {
49-
if self.command_handle.send(block).await.is_err() {
50-
warn!(%block, "ASM handle closed when submitting");
51-
}
52+
self.command_handle
53+
.send_and_wait(|completion| AsmWorkerMessage::SubmitBlock(block, completion))
54+
.await
55+
.map_err(convert_service_error)?
56+
.map_err(Into::into)
57+
}
58+
}
5259

53-
Ok(())
60+
/// Convert service framework errors to worker errors.
61+
fn convert_service_error(err: ServiceError) -> WorkerError {
62+
match err {
63+
ServiceError::WorkerExited | ServiceError::WorkerExitedWithoutResponse => {
64+
WorkerError::WorkerExited
65+
}
66+
ServiceError::WaitCancelled => {
67+
WorkerError::Unexpected("operation was cancelled".to_string())
68+
}
69+
ServiceError::BlockingThreadPanic(msg) => {
70+
WorkerError::Unexpected(format!("blocking thread panicked: {msg}"))
71+
}
72+
ServiceError::UnknownInputErr => WorkerError::Unexpected("unknown input error".to_string()),
5473
}
5574
}

crates/worker/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub use aux_resolver::AuxDataResolver;
1717
pub use builder::AsmWorkerBuilder;
1818
pub use errors::{WorkerError, WorkerResult};
1919
pub use handle::AsmWorkerHandle;
20-
pub use message::SubprotocolMessage;
20+
pub use message::{AsmWorkerMessage, SubprotocolMessage};
2121
pub use service::{AsmWorkerService, AsmWorkerStatus};
2222
pub use state::AsmWorkerServiceState;
2323
pub use traits::WorkerContext;

crates/worker/src/message.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
11
//! Messages from the handle to the worker.
22
33
use strata_primitives::prelude::*;
4+
use strata_service::CommandCompletionSender;
45
use strata_state::asm_state::AsmState;
56

7+
use crate::WorkerResult;
8+
69
/// Messages from the ASM Handle to the subprotocol to give it work to do.
710
#[derive(Debug)]
811
pub enum SubprotocolMessage {
912
NewAsmState(AsmState, L1BlockCommitment),
1013
}
14+
15+
/// Messages from the handle to the ASM worker, with a completion sender to
16+
/// return the processing result.
17+
#[derive(Debug)]
18+
pub enum AsmWorkerMessage {
19+
/// Submit an L1 block for ASM processing. The completion sender receives
20+
/// the result once the block has been fully processed.
21+
SubmitBlock(L1BlockCommitment, CommandCompletionSender<WorkerResult<()>>),
22+
}

0 commit comments

Comments
 (0)