Skip to content

Commit 3432737

Browse files
authored
feat(raiko): retry task if previous running failed. (#408)
* feat(raiko): retry task if error Signed-off-by: smtmfft <[email protected]> * fix in-memory related tests Signed-off-by: smtmfft <[email protected]> * fix lint Signed-off-by: smtmfft <[email protected]> * fix CI test Signed-off-by: smtmfft <[email protected]> --------- Signed-off-by: smtmfft <[email protected]>
1 parent 987241b commit 3432737

File tree

9 files changed

+266
-177
lines changed

9 files changed

+266
-177
lines changed

host/src/interfaces.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,15 @@ impl From<HostError> for TaskStatus {
121121
| HostError::JoinHandle(_)
122122
| HostError::InvalidAddress(_)
123123
| HostError::InvalidRequestConfig(_) => unreachable!(),
124-
HostError::Conversion(e) => TaskStatus::NonDbFailure(e),
125-
HostError::Serde(e) => TaskStatus::NonDbFailure(e.to_string()),
126-
HostError::Core(e) => TaskStatus::NonDbFailure(e.to_string()),
127-
HostError::Anyhow(e) => TaskStatus::NonDbFailure(e.to_string()),
128-
HostError::FeatureNotSupportedError(e) => TaskStatus::NonDbFailure(e.to_string()),
129-
HostError::Io(e) => TaskStatus::NonDbFailure(e.to_string()),
130-
HostError::RPC(_) => TaskStatus::NetworkFailure,
131-
HostError::Guest(_) => TaskStatus::ProofFailure_Generic,
132-
HostError::TaskManager(_) => TaskStatus::SqlDbCorruption,
124+
HostError::Conversion(e) => TaskStatus::IoFailure(e),
125+
HostError::Serde(e) => TaskStatus::IoFailure(e.to_string()),
126+
HostError::Core(e) => TaskStatus::GuestProverFailure(e.to_string()),
127+
HostError::Anyhow(e) => TaskStatus::AnyhowError(e.to_string()),
128+
HostError::FeatureNotSupportedError(_) => TaskStatus::InvalidOrUnsupportedBlock,
129+
HostError::Io(e) => TaskStatus::IoFailure(e.to_string()),
130+
HostError::RPC(e) => TaskStatus::NetworkFailure(e.to_string()),
131+
HostError::Guest(e) => TaskStatus::GuestProverFailure(e.to_string()),
132+
HostError::TaskManager(e) => TaskStatus::TaskDbCorruption(e.to_string()),
133133
}
134134
}
135135
}
@@ -142,15 +142,15 @@ impl From<&HostError> for TaskStatus {
142142
| HostError::JoinHandle(_)
143143
| HostError::InvalidAddress(_)
144144
| HostError::InvalidRequestConfig(_) => unreachable!(),
145-
HostError::Conversion(e) => TaskStatus::NonDbFailure(e.to_owned()),
146-
HostError::Serde(e) => TaskStatus::NonDbFailure(e.to_string()),
147-
HostError::Core(e) => TaskStatus::NonDbFailure(e.to_string()),
148-
HostError::Anyhow(e) => TaskStatus::NonDbFailure(e.to_string()),
149-
HostError::FeatureNotSupportedError(e) => TaskStatus::NonDbFailure(e.to_string()),
150-
HostError::Io(e) => TaskStatus::NonDbFailure(e.to_string()),
151-
HostError::RPC(_) => TaskStatus::NetworkFailure,
152-
HostError::Guest(_) => TaskStatus::ProofFailure_Generic,
153-
HostError::TaskManager(_) => TaskStatus::SqlDbCorruption,
145+
HostError::Conversion(e) => TaskStatus::GuestProverFailure(e.to_owned()),
146+
HostError::Serde(e) => TaskStatus::GuestProverFailure(e.to_string()),
147+
HostError::Core(e) => TaskStatus::GuestProverFailure(e.to_string()),
148+
HostError::Anyhow(e) => TaskStatus::AnyhowError(e.to_string()),
149+
HostError::FeatureNotSupportedError(e) => TaskStatus::GuestProverFailure(e.to_string()),
150+
HostError::Io(e) => TaskStatus::GuestProverFailure(e.to_string()),
151+
HostError::RPC(e) => TaskStatus::NetworkFailure(e.to_string()),
152+
HostError::Guest(e) => TaskStatus::GuestProverFailure(e.to_string()),
153+
HostError::TaskManager(e) => TaskStatus::TaskDbCorruption(e.to_string()),
154154
}
155155
}
156156
}

host/src/server/api/v2/proof/mod.rs

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -64,42 +64,46 @@ async fn proof_handler(
6464

6565
let mut manager = prover_state.task_manager();
6666
let status = manager.get_task_proving_status(&key).await?;
67-
68-
let Some((latest_status, ..)) = status.0.last() else {
69-
// If there are no tasks with provided config, create a new one.
70-
manager.enqueue_task(&key).await?;
71-
72-
prover_state
73-
.task_channel
74-
.try_send(Message::from(&proof_request))?;
75-
76-
return Ok(TaskStatus::Registered.into());
77-
};
78-
79-
match latest_status {
80-
// If task has been cancelled add it to the queue again
81-
TaskStatus::Cancelled
82-
| TaskStatus::Cancelled_Aborted
83-
| TaskStatus::Cancelled_NeverStarted
84-
| TaskStatus::CancellationInProgress => {
85-
manager
86-
.update_task_progress(key, TaskStatus::Registered, None)
87-
.await?;
67+
match status.0.last() {
68+
Some((latest_status, ..)) => {
69+
match latest_status {
70+
// If task has been cancelled
71+
TaskStatus::Cancelled
72+
| TaskStatus::Cancelled_Aborted
73+
| TaskStatus::Cancelled_NeverStarted
74+
| TaskStatus::CancellationInProgress
75+
// or if the task is failed, add it to the queue again
76+
| TaskStatus::GuestProverFailure(_)
77+
| TaskStatus::UnspecifiedFailureReason => {
78+
manager
79+
.update_task_progress(key, TaskStatus::Registered, None)
80+
.await?;
81+
82+
prover_state
83+
.task_channel
84+
.try_send(Message::from(&proof_request))?;
85+
86+
Ok(TaskStatus::Registered.into())
87+
}
88+
// If the task has succeeded, return the proof.
89+
TaskStatus::Success => {
90+
let proof = manager.get_task_proof(&key).await?;
91+
92+
Ok(proof.into())
93+
}
94+
// For all other statuses just return the status.
95+
status => Ok(status.clone().into()),
96+
}
97+
}
98+
None => {
99+
manager.enqueue_task(&key).await?;
88100

89101
prover_state
90102
.task_channel
91103
.try_send(Message::from(&proof_request))?;
92104

93105
Ok(TaskStatus::Registered.into())
94106
}
95-
// If the task has succeeded, return the proof.
96-
TaskStatus::Success => {
97-
let proof = manager.get_task_proof(&key).await?;
98-
99-
Ok(proof.into())
100-
}
101-
// For all other statuses just return the status.
102-
status => Ok(status.clone().into()),
103107
}
104108
}
105109

host/src/server/api/v3/proof/mod.rs

Lines changed: 64 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -84,39 +84,42 @@ async fn proof_handler(
8484
for (key, req) in tasks.iter() {
8585
let status = manager.get_task_proving_status(key).await?;
8686

87-
let Some((latest_status, ..)) = status.0.last() else {
87+
if let Some((latest_status, ..)) = status.0.last() {
88+
match latest_status {
89+
// If task has been cancelled
90+
TaskStatus::Cancelled
91+
| TaskStatus::Cancelled_Aborted
92+
| TaskStatus::Cancelled_NeverStarted
93+
| TaskStatus::CancellationInProgress
94+
// or if the task is failed, add it to the queue again
95+
| TaskStatus::GuestProverFailure(_)
96+
| TaskStatus::UnspecifiedFailureReason
97+
=> {
98+
manager
99+
.update_task_progress(key.clone(), TaskStatus::Registered, None)
100+
.await?;
101+
prover_state.task_channel.try_send(Message::from(req))?;
102+
103+
is_registered = true;
104+
is_success = false;
105+
}
106+
// If the task has succeeded, return the proof.
107+
TaskStatus::Success => {}
108+
// For all other statuses just return the status.
109+
status => {
110+
statuses.push(status.clone());
111+
is_registered = false;
112+
is_success = false;
113+
}
114+
}
115+
} else {
88116
// If there are no tasks with provided config, create a new one.
89117
manager.enqueue_task(key).await?;
90118

91119
prover_state.task_channel.try_send(Message::from(req))?;
92120
is_registered = true;
93121
continue;
94122
};
95-
96-
match latest_status {
97-
// If task has been cancelled add it to the queue again
98-
TaskStatus::Cancelled
99-
| TaskStatus::Cancelled_Aborted
100-
| TaskStatus::Cancelled_NeverStarted
101-
| TaskStatus::CancellationInProgress => {
102-
manager
103-
.update_task_progress(key.clone(), TaskStatus::Registered, None)
104-
.await?;
105-
106-
prover_state.task_channel.try_send(Message::from(req))?;
107-
108-
is_registered = true;
109-
is_success = false;
110-
}
111-
// If the task has succeeded, return the proof.
112-
TaskStatus::Success => {}
113-
// For all other statuses just return the status.
114-
status => {
115-
statuses.push(status.clone());
116-
is_registered = false;
117-
is_success = false;
118-
}
119-
}
120123
}
121124

122125
if is_registered {
@@ -147,7 +150,40 @@ async fn proof_handler(
147150
.get_aggregation_task_proving_status(&aggregation_request)
148151
.await?;
149152

150-
let Some((latest_status, ..)) = status.0.last() else {
153+
if let Some((latest_status, ..)) = status.0.last() {
154+
match latest_status {
155+
// If task has been cancelled add it to the queue again
156+
TaskStatus::Cancelled
157+
| TaskStatus::Cancelled_Aborted
158+
| TaskStatus::Cancelled_NeverStarted
159+
| TaskStatus::CancellationInProgress
160+
// or if the task is failed, add it to the queue again
161+
| TaskStatus::GuestProverFailure(_)
162+
| TaskStatus::UnspecifiedFailureReason
163+
=> {
164+
manager
165+
.update_aggregation_task_progress(
166+
&aggregation_request,
167+
TaskStatus::Registered,
168+
None,
169+
)
170+
.await?;
171+
prover_state
172+
.task_channel
173+
.try_send(Message::from(aggregation_request))?;
174+
Ok(Status::from(TaskStatus::Registered))
175+
}
176+
// If the task has succeeded, return the proof.
177+
TaskStatus::Success => {
178+
let proof = manager
179+
.get_aggregation_task_proof(&aggregation_request)
180+
.await?;
181+
Ok(proof.into())
182+
}
183+
// For all other statuses just return the status.
184+
status => Ok(status.clone().into()),
185+
}
186+
} else {
151187
// If there are no tasks with provided config, create a new one.
152188
manager
153189
.enqueue_aggregation_task(&aggregation_request)
@@ -156,39 +192,7 @@ async fn proof_handler(
156192
prover_state
157193
.task_channel
158194
.try_send(Message::from(aggregation_request.clone()))?;
159-
return Ok(Status::from(TaskStatus::Registered));
160-
};
161-
162-
match latest_status {
163-
// If task has been cancelled add it to the queue again
164-
TaskStatus::Cancelled
165-
| TaskStatus::Cancelled_Aborted
166-
| TaskStatus::Cancelled_NeverStarted
167-
| TaskStatus::CancellationInProgress => {
168-
manager
169-
.update_aggregation_task_progress(
170-
&aggregation_request,
171-
TaskStatus::Registered,
172-
None,
173-
)
174-
.await?;
175-
176-
prover_state
177-
.task_channel
178-
.try_send(Message::from(aggregation_request))?;
179-
180-
Ok(Status::from(TaskStatus::Registered))
181-
}
182-
// If the task has succeeded, return the proof.
183-
TaskStatus::Success => {
184-
let proof = manager
185-
.get_aggregation_task_proof(&aggregation_request)
186-
.await?;
187-
188-
Ok(proof.into())
189-
}
190-
// For all other statuses just return the status.
191-
status => Ok(status.clone().into()),
195+
Ok(Status::from(TaskStatus::Registered))
192196
}
193197
} else {
194198
let status = statuses.into_iter().collect::<TaskStatus>();

provers/risc0/driver/src/bonsai.rs

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ pub async fn maybe_prove<I: Serialize, O: Eq + Debug + Serialize + DeserializeOw
128128
assumptions: (Vec<impl Into<AssumptionReceipt>>, Vec<String>),
129129
proof_key: ProofKey,
130130
id_store: &mut Option<&mut dyn IdWrite>,
131-
) -> Option<(String, Receipt)> {
131+
) -> ProverResult<(String, Receipt)> {
132132
let (assumption_instances, assumption_uuids) = assumptions;
133133

134134
let encoded_output =
@@ -152,38 +152,52 @@ pub async fn maybe_prove<I: Serialize, O: Eq + Debug + Serialize + DeserializeOw
152152
Ok(_) => {}
153153
Err(e) => {
154154
error!("Failed to scale up bonsai: {e:?}");
155-
return None;
155+
return Err(ProverError::GuestError(
156+
"Failed to scale up bonsai".to_string(),
157+
));
156158
}
157159
}
158160
// query bonsai service until it works
159-
loop {
160-
match prove_bonsai(
161+
macro_rules! retry_with_backoff {
162+
($max_retries:expr, $retry_delay:expr, $operation:expr, $err_transform:expr) => {{
163+
let mut attempt = 0;
164+
loop {
165+
match $operation {
166+
Ok(result) => break Ok(result),
167+
Err(e) => {
168+
if attempt >= $max_retries {
169+
error!("Max retries ({}) reached, aborting...", $max_retries);
170+
break Err($err_transform(e));
171+
}
172+
warn!(
173+
"Operation failed (attempt {}/{}): {:?}",
174+
attempt + 1,
175+
$max_retries,
176+
e
177+
);
178+
tokio_async_sleep(Duration::from_secs($retry_delay)).await;
179+
attempt += 1;
180+
}
181+
}
182+
}
183+
}};
184+
}
185+
186+
let (uuid, receipt) = retry_with_backoff!(
187+
MAX_REQUEST_RETRY,
188+
20,
189+
prove_bonsai(
161190
encoded_input.clone(),
162191
elf,
163192
expected_output,
164193
assumption_uuids.clone(),
165194
proof_key,
166195
id_store,
167196
)
168-
.await
169-
{
170-
Ok((receipt_uuid, receipt)) => {
171-
break (receipt_uuid, receipt, false);
172-
}
173-
Err(BonsaiExecutionError::SdkFailure(err)) => {
174-
warn!("Bonsai SDK fail: {err:?}, keep tracking...");
175-
tokio_async_sleep(Duration::from_secs(15)).await;
176-
}
177-
Err(BonsaiExecutionError::Other(err)) => {
178-
warn!("Something wrong: {err:?}, keep tracking...");
179-
tokio_async_sleep(Duration::from_secs(15)).await;
180-
}
181-
Err(BonsaiExecutionError::Fatal(err)) => {
182-
error!("Fatal error on Bonsai: {err:?}");
183-
return None;
184-
}
185-
}
186-
}
197+
.await,
198+
|e| ProverError::GuestError(format!("Bonsai SDK call fail: {e:?}").to_string())
199+
)?;
200+
(uuid, receipt, false)
187201
} else {
188202
// run prover
189203
info!("start running local prover");
@@ -197,7 +211,9 @@ pub async fn maybe_prove<I: Serialize, O: Eq + Debug + Serialize + DeserializeOw
197211
Ok(receipt) => (Default::default(), receipt, false),
198212
Err(e) => {
199213
warn!("Failed to prove locally: {e:?}");
200-
return None;
214+
return Err(ProverError::GuestError(
215+
"Failed to prove locally".to_string(),
216+
));
201217
}
202218
}
203219
};
@@ -211,6 +227,7 @@ pub async fn maybe_prove<I: Serialize, O: Eq + Debug + Serialize + DeserializeOw
211227
info!("Prover succeeded");
212228
} else {
213229
error!("Output mismatch! Prover: {output_guest:?}, expected: {expected_output:?}");
230+
return Err(ProverError::GuestError("Output mismatch!".to_string()));
214231
}
215232

216233
// upload receipt to bonsai
@@ -229,7 +246,7 @@ pub async fn maybe_prove<I: Serialize, O: Eq + Debug + Serialize + DeserializeOw
229246
}
230247

231248
// return result
232-
Some(result)
249+
Ok(result)
233250
}
234251

235252
pub async fn upload_receipt(receipt: &Receipt) -> anyhow::Result<String> {

0 commit comments

Comments
 (0)