Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add priority-queue #519

Open
wants to merge 4 commits into
base: pacaya-dev-metrics
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions host/src/server/api/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ async fn set_ballot(
Json(probs): Json<BTreeMap<ProofType, f64>>,
) -> HostResult<&'static str> {
let ballot = Ballot::new(probs).map_err(|e| anyhow::anyhow!(e))?;
actor.set_ballot(ballot);
actor.set_ballot(ballot).await;
Ok("Ballot set successfully")
}

async fn get_ballot(State(actor): State<Actor>) -> Response {
let ballot = actor.get_ballot().probabilities().to_owned();
let ballot = actor.get_ballot().await.probabilities().to_owned();
Json(ballot).into_response()
}
6 changes: 5 additions & 1 deletion host/src/server/api/v2/proof/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ use utoipa::OpenApi;
)]
/// Prune all tasks.
async fn prune_handler(State(actor): State<Actor>) -> HostResult<()> {
let statuses = actor.pool_list_status().map_err(|e| anyhow::anyhow!(e))?;
let statuses = actor
.pool_list_status()
.await
.map_err(|e| anyhow::anyhow!(e))?;
for (key, status) in statuses {
tracing::info!("Pruning task: {key} with status: {status}");
let _ = actor
.pool_remove_request(&key)
.await
.map_err(|e| anyhow::anyhow!(e))?;
}
Ok(())
Expand Down
5 changes: 4 additions & 1 deletion host/src/server/api/v2/proof/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use utoipa::OpenApi;
///
/// Retrieve a list of `{ chain_id, blockhash, prover_type, prover, status }` items.
async fn report_handler(State(actor): State<Actor>) -> HostResult<Json<Value>> {
let statuses = actor.pool_list_status().map_err(|e| anyhow::anyhow!(e))?;
let statuses = actor
.pool_list_status()
.await
.map_err(|e| anyhow::anyhow!(e))?;

// For compatibility with the old API, we need to convert the statuses to the old format.
let to_task_status = |status: StatusWithContext| match status.into_status() {
Expand Down
6 changes: 5 additions & 1 deletion host/src/server/api/v3/proof/aggregate/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ use utoipa::OpenApi;
)]
/// Prune all aggregation tasks.
async fn prune_handler(State(actor): State<Actor>) -> HostResult<()> {
let statuses = actor.pool_list_status().map_err(|e| anyhow::anyhow!(e))?;
let statuses = actor
.pool_list_status()
.await
.map_err(|e| anyhow::anyhow!(e))?;
for (key, status) in statuses {
tracing::info!("Pruning task: {key} with status: {status}");
let _ = actor
.pool_remove_request(&key)
.await
.map_err(|e| anyhow::anyhow!(e))?;
}
Ok(())
Expand Down
61 changes: 11 additions & 50 deletions host/src/server/handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use raiko_reqactor::{Action, Actor};
use raiko_reqactor::Actor;
use raiko_reqpool::{
AggregationRequestEntity, AggregationRequestKey, RequestEntity, RequestKey,
SingleProofRequestKey, Status,
Expand All @@ -13,18 +13,19 @@ pub async fn prove(
request_key: RequestKey,
request_entity: RequestEntity,
) -> Result<Status, String> {
let action = Action::Prove {
request_key,
request_entity,
start_time: chrono::Utc::now(),
};
act(actor, action).await
if actor.is_paused() {
return Err("System is paused".to_string());
}

actor
.act(request_key.clone(), request_entity, chrono::Utc::now())
.await
.map(|status| status.into_status())
}

/// Cancel the request.
pub async fn cancel(actor: &Actor, request_key: RequestKey) -> Result<Status, String> {
let action = Action::Cancel { request_key };
act(actor, action).await
pub async fn cancel(_actor: &Actor, _request_key: RequestKey) -> Result<Status, String> {
unimplemented!()
}

/// Prove the aggregation request and its sub-requests.
Expand Down Expand Up @@ -95,43 +96,3 @@ pub async fn cancel_aggregation(
}
cancel(actor, request_key.into()).await
}

// === Helper functions ===

// Send the action to the Actor and return the response status
async fn act(actor: &Actor, action: Action) -> Result<Status, String> {
// Check if the system is paused
if actor.is_paused() {
return Err("System is paused".to_string());
}

// Return early if the request is already succeeded
if let Ok(Some(status)) = actor.pool_get_status(&action.request_key()) {
if matches!(status.status(), Status::Success { .. }) {
return Ok(status.into_status());
}
}

// Just logging the status of the request
let _ = actor
.pool_get_status(&action.request_key())
.map(|status_opt| {
tracing::trace!(
"trace request in {request_key}: {status}",
request_key = action.request_key(),
status = status_opt
.map(|status| status.into_status().to_string())
.unwrap_or("None".to_string()),
)
});

// Send the action to the Actor and return the response status
actor.act(action.clone()).await.map(|status| {
tracing::trace!(
"trace request out {request_key}: {status}",
request_key = action.request_key(),
status = status.status()
);
status.into_status()
})
}
2 changes: 1 addition & 1 deletion host/src/server/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub async fn draw_for_zk_any_request(
"Missing block number".to_string(),
))?;
let (_, blockhash) = get_task_data(&network, block_number, actor.chain_specs()).await?;
Ok(actor.draw(&blockhash))
Ok(actor.draw(&blockhash).await)
}

pub fn fulfill_sp1_params(req: &mut Value) {
Expand Down
Loading
Loading