Skip to content

Commit 30dcb2e

Browse files
committed
feat(reqactor): add priority-queue
1 parent 1ff260a commit 30dcb2e

File tree

10 files changed

+560
-660
lines changed

10 files changed

+560
-660
lines changed

host/src/server/api/admin.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ async fn set_ballot(
2727
Json(probs): Json<BTreeMap<ProofType, f64>>,
2828
) -> HostResult<&'static str> {
2929
let ballot = Ballot::new(probs).map_err(|e| anyhow::anyhow!(e))?;
30-
actor.set_ballot(ballot);
30+
actor.set_ballot(ballot).await;
3131
Ok("Ballot set successfully")
3232
}
3333

3434
async fn get_ballot(State(actor): State<Actor>) -> Response {
35-
let ballot = actor.get_ballot().probabilities().to_owned();
35+
let ballot = actor.get_ballot().await.probabilities().to_owned();
3636
Json(ballot).into_response()
3737
}

host/src/server/api/v2/proof/prune.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@ use utoipa::OpenApi;
1111
)]
1212
/// Prune all tasks.
1313
async fn prune_handler(State(actor): State<Actor>) -> HostResult<()> {
14-
let statuses = actor.pool_list_status().map_err(|e| anyhow::anyhow!(e))?;
14+
let statuses = actor
15+
.pool_list_status()
16+
.await
17+
.map_err(|e| anyhow::anyhow!(e))?;
1518
for (key, status) in statuses {
1619
tracing::info!("Pruning task: {key} with status: {status}");
1720
let _ = actor
1821
.pool_remove_request(&key)
22+
.await
1923
.map_err(|e| anyhow::anyhow!(e))?;
2024
}
2125
Ok(())

host/src/server/api/v2/proof/report.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ use utoipa::OpenApi;
1919
///
2020
/// Retrieve a list of `{ chain_id, blockhash, prover_type, prover, status }` items.
2121
async fn report_handler(State(actor): State<Actor>) -> HostResult<Json<Value>> {
22-
let statuses = actor.pool_list_status().map_err(|e| anyhow::anyhow!(e))?;
22+
let statuses = actor
23+
.pool_list_status()
24+
.await
25+
.map_err(|e| anyhow::anyhow!(e))?;
2326

2427
// For compatibility with the old API, we need to convert the statuses to the old format.
2528
let to_task_status = |status: StatusWithContext| match status.into_status() {

host/src/server/api/v3/proof/aggregate/prune.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@ use utoipa::OpenApi;
1111
)]
1212
/// Prune all aggregation tasks.
1313
async fn prune_handler(State(actor): State<Actor>) -> HostResult<()> {
14-
let statuses = actor.pool_list_status().map_err(|e| anyhow::anyhow!(e))?;
14+
let statuses = actor
15+
.pool_list_status()
16+
.await
17+
.map_err(|e| anyhow::anyhow!(e))?;
1518
for (key, status) in statuses {
1619
tracing::info!("Pruning task: {key} with status: {status}");
1720
let _ = actor
1821
.pool_remove_request(&key)
22+
.await
1923
.map_err(|e| anyhow::anyhow!(e))?;
2024
}
2125
Ok(())

host/src/server/handler.rs

+11-50
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use raiko_reqactor::{Action, Actor};
1+
use raiko_reqactor::Actor;
22
use raiko_reqpool::{
33
AggregationRequestEntity, AggregationRequestKey, RequestEntity, RequestKey,
44
SingleProofRequestKey, Status,
@@ -13,18 +13,19 @@ pub async fn prove(
1313
request_key: RequestKey,
1414
request_entity: RequestEntity,
1515
) -> Result<Status, String> {
16-
let action = Action::Prove {
17-
request_key,
18-
request_entity,
19-
start_time: chrono::Utc::now(),
20-
};
21-
act(actor, action).await
16+
if actor.is_paused() {
17+
return Err("System is paused".to_string());
18+
}
19+
20+
actor
21+
.act(request_key.clone(), request_entity, chrono::Utc::now())
22+
.await
23+
.map(|status| status.into_status())
2224
}
2325

2426
/// Cancel the request.
25-
pub async fn cancel(actor: &Actor, request_key: RequestKey) -> Result<Status, String> {
26-
let action = Action::Cancel { request_key };
27-
act(actor, action).await
27+
pub async fn cancel(_actor: &Actor, _request_key: RequestKey) -> Result<Status, String> {
28+
unimplemented!()
2829
}
2930

3031
/// Prove the aggregation request and its sub-requests.
@@ -95,43 +96,3 @@ pub async fn cancel_aggregation(
9596
}
9697
cancel(actor, request_key.into()).await
9798
}
98-
99-
// === Helper functions ===
100-
101-
// Send the action to the Actor and return the response status
102-
async fn act(actor: &Actor, action: Action) -> Result<Status, String> {
103-
// Check if the system is paused
104-
if actor.is_paused() {
105-
return Err("System is paused".to_string());
106-
}
107-
108-
// Return early if the request is already succeeded
109-
if let Ok(Some(status)) = actor.pool_get_status(&action.request_key()) {
110-
if matches!(status.status(), Status::Success { .. }) {
111-
return Ok(status.into_status());
112-
}
113-
}
114-
115-
// Just logging the status of the request
116-
let _ = actor
117-
.pool_get_status(&action.request_key())
118-
.map(|status_opt| {
119-
tracing::trace!(
120-
"trace request in {request_key}: {status}",
121-
request_key = action.request_key(),
122-
status = status_opt
123-
.map(|status| status.into_status().to_string())
124-
.unwrap_or("None".to_string()),
125-
)
126-
});
127-
128-
// Send the action to the Actor and return the response status
129-
actor.act(action.clone()).await.map(|status| {
130-
tracing::trace!(
131-
"trace request out {request_key}: {status}",
132-
request_key = action.request_key(),
133-
status = status.status()
134-
);
135-
status.into_status()
136-
})
137-
}

host/src/server/utils.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ pub async fn draw_for_zk_any_request(
8787
"Missing block number".to_string(),
8888
))?;
8989
let (_, blockhash) = get_task_data(&network, block_number, actor.chain_specs()).await?;
90-
Ok(actor.draw(&blockhash))
90+
Ok(actor.draw(&blockhash).await)
9191
}
9292

9393
pub fn fulfill_sp1_params(req: &mut Value) {

0 commit comments

Comments
 (0)