diff --git a/.gitignore b/.gitignore index 13a00914..c4d45bfe 100644 --- a/.gitignore +++ b/.gitignore @@ -64,6 +64,7 @@ target/ # Tests artifacts mutants.out* lcov.info +functional-tests/_dd_parallel_shard*/ # Unit test / coverage reports htmlcov/ @@ -97,4 +98,4 @@ fuzz-*.log # Docker volume data docker/vol/fdb/data/ -docker/vol/mosaic-*/tables/ \ No newline at end of file +docker/vol/mosaic-*/tables/ diff --git a/crates/job/api/src/submission.rs b/crates/job/api/src/submission.rs index f32c34a1..a401c419 100644 --- a/crates/job/api/src/submission.rs +++ b/crates/job/api/src/submission.rs @@ -102,7 +102,7 @@ impl JobActions { /// /// The peer ID on [`JobCompletion`] plus the variant here (garbler vs /// evaluator) identifies which SM to deliver to. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ActionCompletion { /// Garbler tracked action completed. Garbler { @@ -180,7 +180,7 @@ impl ActionCompletion { /// /// Jobs always retry internally until they succeed — every submitted action /// eventually produces exactly one completion. There is no failure variant. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct JobCompletion { /// The peer whose SM this result should be routed to. pub peer_id: PeerId, diff --git a/crates/job/scheduler/src/garbling/mod.rs b/crates/job/scheduler/src/garbling/mod.rs index 363618be..2a5c8969 100644 --- a/crates/job/scheduler/src/garbling/mod.rs +++ b/crates/job/scheduler/src/garbling/mod.rs @@ -58,6 +58,8 @@ use mosaic_job_api::{ use mosaic_net_svc_api::PeerId; use tracing::Instrument; +use crate::SchedulerFault; + /// Size of each v5c gate record in bytes (3 × u32). const GATE_SIZE: usize = 12; @@ -259,10 +261,11 @@ impl GarblingCoordinator { /// Jobs submitted via [`submit`](Self::submit) are collected into batches, /// sessions are created via the `factory`, and workers process them /// concurrently. - pub fn new( + pub(crate) fn new( config: GarblingConfig, factory: Arc, completion_tx: kanal::AsyncSender, + fault_tx: kanal::AsyncSender, ) -> Self { let (submit_tx, submit_rx) = kanal::bounded_async(config.max_concurrent * 2); @@ -273,7 +276,13 @@ impl GarblingCoordinator { .enable_timer() .build() .expect("failed to build monoio runtime for garbling coordinator") - .block_on(coordinator_loop(config, factory, submit_rx, completion_tx)); + .block_on(coordinator_loop( + config, + factory, + submit_rx, + completion_tx, + fault_tx, + )); }) .expect("failed to spawn garbling coordinator thread"); @@ -319,6 +328,7 @@ async fn coordinator_loop( factory: Arc, submit_rx: kanal::AsyncReceiver, completion_tx: kanal::AsyncSender, + fault_tx: kanal::AsyncSender, ) { let span = tracing::info_span!( "job_scheduler.garbling_coordinator", @@ -450,6 +460,7 @@ async fn coordinator_loop( sessions, &mut workers, &completion_tx, + &fault_tx, &mut pending_retry, ) .instrument(tracing::info_span!( @@ -491,6 +502,7 @@ async fn run_pass( sessions: Vec, workers: &mut [WorkerHandle], completion_tx: &kanal::AsyncSender, + fault_tx: &kanal::AsyncSender, pending_retry: &mut Vec, ) { let n_workers = workers.len(); @@ -545,6 +557,7 @@ async fn run_pass( &mut active_jobs_by_worker, workers, completion_tx, + fault_tx, pending_retry, ) .await; @@ -615,6 +628,7 @@ async fn run_pass( &mut active_jobs_by_worker, workers, completion_tx, + fault_tx, Duration::from_secs(60), pending_retry, ) @@ -680,6 +694,7 @@ async fn collect_finish_reports( active_jobs_by_worker: &mut HashMap>, workers: &mut [WorkerHandle], completion_tx: &kanal::AsyncSender, + fault_tx: &kanal::AsyncSender, pending_retry: &mut Vec, ) { collect_finish_reports_with_timeout( @@ -687,6 +702,7 @@ async fn collect_finish_reports( active_jobs_by_worker, workers, completion_tx, + fault_tx, Duration::from_secs(60), pending_retry, ) @@ -700,6 +716,7 @@ async fn collect_finish_reports_with_timeout( active_jobs_by_worker: &mut HashMap>, workers: &mut [WorkerHandle], completion_tx: &kanal::AsyncSender, + fault_tx: &kanal::AsyncSender, finish_timeout: Duration, pending_retry: &mut Vec, ) { @@ -719,11 +736,18 @@ async fn collect_finish_reports_with_timeout( Some(WorkerReport::FinishDone(report)) => { active_jobs_by_worker.remove(&wid); for completion in report.completions { + let peer_id = completion.peer_id; if completion_tx.send(completion).await.is_err() { tracing::error!( worker = wid, "completion channel closed while forwarding finish report" ); + let _ = fault_tx + .send(SchedulerFault::CompletionChannelClosed { + source: "garbling_coordinator", + peer_id, + }) + .await; return; } } @@ -756,7 +780,7 @@ async fn collect_finish_reports_with_timeout( /// /// Receives sessions and chunk commands from the coordinator's main thread. /// Processes sessions sequentially per chunk (parallel across workers). -/// Sends completions directly to the SM via `completion_tx`. +/// Reports completed results back to the coordinator. async fn worker_loop( id: usize, chunk_timeout: Duration, @@ -959,7 +983,7 @@ fn convert_block(block: &Block, num_gates: usize) -> OwnedBlock { #[cfg(test)] mod tests { - use std::{future::Future, pin::Pin, sync::Arc}; + use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Duration}; use mosaic_cac_types::{ GarblingSeed, Seed, @@ -1045,6 +1069,7 @@ mod tests { }]; let (completion_tx, _completion_rx) = kanal::bounded_async(1); + let (fault_tx, _fault_rx) = kanal::bounded_async(1); let mut pending_retry = Vec::new(); run_pass( &GarblingConfig { @@ -1057,6 +1082,7 @@ mod tests { sessions, &mut workers, &completion_tx, + &fault_tx, &mut pending_retry, ) .await; @@ -1123,6 +1149,7 @@ mod tests { let mut active_jobs_by_worker = HashMap::from([(0usize, vec![sample_job(5)])]); let (completion_tx, completion_rx) = kanal::bounded_async(2); + let (fault_tx, _fault_rx) = kanal::bounded_async(1); let mut pending_retry = Vec::new(); collect_finish_reports( @@ -1130,6 +1157,7 @@ mod tests { &mut active_jobs_by_worker, &mut workers, &completion_tx, + &fault_tx, &mut pending_retry, ) .await; @@ -1162,6 +1190,7 @@ mod tests { let job = sample_job(6); let mut active_jobs_by_worker = HashMap::from([(0usize, vec![job.clone()])]); let (completion_tx, _completion_rx) = kanal::bounded_async(1); + let (fault_tx, _fault_rx) = kanal::bounded_async(1); let mut pending_retry = Vec::new(); collect_finish_reports_with_timeout( @@ -1169,6 +1198,7 @@ mod tests { &mut active_jobs_by_worker, &mut workers, &completion_tx, + &fault_tx, Duration::from_millis(1), &mut pending_retry, ) @@ -1179,4 +1209,70 @@ mod tests { assert_eq!(pending_retry[0].peer_id, job.peer_id); }); } + + #[test] + fn closed_completion_channel_reports_scheduler_fault() { + run_monoio(async { + let peer_id = PeerId::from([11; 32]); + let (command_tx, _command_rx) = kanal::bounded_async(2); + let (report_tx, report_rx) = kanal::bounded_async(2); + let (completion_tx, completion_rx) = kanal::bounded_async(1); + let (fault_tx, fault_rx) = kanal::bounded_async(1); + drop(completion_rx); + + report_tx + .send(WorkerReport::FinishDone(FinishReport { + completions: vec![JobCompletion { + peer_id, + completion: ActionCompletion::Garbler { + id: ActionId::SendCommitMsgChunk(0), + result: ActionResult::CommitMsgChunkAcked, + }, + }], + retry_jobs: vec![], + })) + .await + .expect("send finish report"); + + let mut workers = vec![WorkerHandle { + id: 0, + command_tx, + report_rx, + thread: None, + }]; + let mut active_jobs_by_worker = HashMap::from([( + 0usize, + vec![PendingCircuitJob { + peer_id, + action: CircuitAction::GarblerTransfer { + seed: GarblingSeed::from([5; 32]), + }, + }], + )]); + let mut pending_retry = Vec::new(); + + collect_finish_reports_with_timeout( + &[0], + &mut active_jobs_by_worker, + &mut workers, + &completion_tx, + &fault_tx, + Duration::from_millis(1), + &mut pending_retry, + ) + .await; + + let fault = monoio::time::timeout(Duration::from_secs(2), fault_rx.recv()) + .await + .expect("timed out waiting for scheduler fault") + .expect("fault channel should stay open"); + assert!(matches!( + fault, + SchedulerFault::CompletionChannelClosed { + source: "garbling_coordinator", + peer_id: fault_peer, + } if fault_peer == peer_id + )); + }); + } } diff --git a/crates/job/scheduler/src/lib.rs b/crates/job/scheduler/src/lib.rs index ef3c5dbe..6c26ff50 100644 --- a/crates/job/scheduler/src/lib.rs +++ b/crates/job/scheduler/src/lib.rs @@ -20,6 +20,16 @@ pub mod scheduler; pub(crate) mod priority; +use mosaic_net_svc_api::PeerId; + +#[derive(Debug, Clone)] +pub(crate) enum SchedulerFault { + CompletionChannelClosed { + source: &'static str, + peer_id: PeerId, + }, +} + // Re-export the API crate for convenience. pub use garbling::GarblingConfig; pub use mosaic_job_api; diff --git a/crates/job/scheduler/src/pool/mod.rs b/crates/job/scheduler/src/pool/mod.rs index 260f4481..daed09c4 100644 --- a/crates/job/scheduler/src/pool/mod.rs +++ b/crates/job/scheduler/src/pool/mod.rs @@ -17,7 +17,7 @@ use self::{ queue::JobQueue, worker::{Worker, WorkerJob}, }; -use crate::priority::Priority; +use crate::{SchedulerFault, priority::Priority}; /// A job waiting in the pool's shared queue. /// @@ -91,6 +91,7 @@ impl JobThreadPool { config: PoolConfig, dispatcher: Arc, completion_tx: kanal::AsyncSender, + fault_tx: kanal::AsyncSender, ) -> Self { let queue = Arc::new(JobQueue::new(config.priority_queue)); @@ -101,6 +102,7 @@ impl JobThreadPool { Arc::clone(&dispatcher), Arc::clone(&queue), completion_tx.clone(), + fault_tx.clone(), config.concurrency_per_worker, ) }) diff --git a/crates/job/scheduler/src/pool/worker.rs b/crates/job/scheduler/src/pool/worker.rs index 6c14f052..1488822b 100644 --- a/crates/job/scheduler/src/pool/worker.rs +++ b/crates/job/scheduler/src/pool/worker.rs @@ -20,6 +20,7 @@ use mosaic_net_svc_api::PeerId; use tracing::Instrument; use super::{PoolJob, queue::JobQueue}; +use crate::SchedulerFault; /// Initial backoff delay before requeuing a job that returned /// [`ExecuteResult::Retry`]. @@ -101,6 +102,7 @@ impl Worker { dispatcher: Arc, queue: Arc, completion_tx: kanal::AsyncSender, + fault_tx: kanal::AsyncSender, concurrency: usize, ) -> Self { let handle = std::thread::Builder::new() @@ -115,6 +117,7 @@ impl Worker { dispatcher, queue, completion_tx, + fault_tx, concurrency, )); }) @@ -156,6 +159,7 @@ async fn worker_loop( dispatcher: Arc, queue: Arc, completion_tx: kanal::AsyncSender, + fault_tx: kanal::AsyncSender, concurrency: usize, ) { // Permit pool: bounded channel pre-filled with `concurrency` tokens. @@ -188,6 +192,7 @@ async fn worker_loop( let dispatcher = Arc::clone(&dispatcher); let queue = Arc::clone(&queue); let completion_tx = completion_tx.clone(); + let fault_tx = fault_tx.clone(); let permit_tx = permit_tx.clone(); // 3. Spawn local task. The permit is returned when the task completes, regardless of @@ -199,7 +204,20 @@ async fn worker_loop( match result { ExecuteResult::Complete(completion) => { tracing::debug!("worker job completed"); - let _ = completion_tx.send(*completion).await; + if completion_tx.send(*completion).await.is_err() { + tracing::error!( + worker = id, + ?peer_id, + "completion channel closed; signaling fatal scheduler fault" + ); + queue.close(); + let _ = fault_tx + .send(SchedulerFault::CompletionChannelClosed { + source: "pool_worker", + peer_id, + }) + .await; + } } ExecuteResult::Retry => { let mut job = pool_job; @@ -373,3 +391,287 @@ async fn dispatch_evaluator( } } } + +#[cfg(test)] +#[allow(clippy::manual_async_fn)] +mod tests { + use std::{future::Future, sync::Arc, time::Duration}; + + use mosaic_cac_types::{ + AdaptorMsgChunk, ChallengeMsg, ChallengeResponseMsgHeader, CommitMsgHeader, DepositId, + GarblingSeed, Index, Seed, TableTransferReceiptMsg, TableTransferRequestMsg, + state_machine::{ + evaluator::ChunkIndex, + garbler::{self, Action as GarblerAction, Wire}, + }, + }; + use mosaic_job_api::{ + ActionCompletion, CircuitError, CircuitSession, ExecuteEvaluatorJob, ExecuteGarblerJob, + HandlerOutcome, OwnedChunk, + }; + + use super::*; + use crate::priority::Priority; + + struct DummySession; + + impl CircuitSession for DummySession { + fn process_chunk( + &mut self, + _chunk: &Arc, + ) -> std::pin::Pin> + Send + '_>> { + Box::pin(async { Ok(()) }) + } + + fn finish( + self: Box, + ) -> std::pin::Pin + Send>> { + Box::pin(async { HandlerOutcome::Retry }) + } + } + + #[derive(Clone, Copy)] + struct TestDispatcher; + + impl ExecuteGarblerJob for TestDispatcher { + type Session = DummySession; + + fn generate_polynomial_commitments( + &self, + _peer_id: &PeerId, + seed: Seed, + wire: Wire, + ) -> impl Future + Send { + async move { + HandlerOutcome::Done(ActionCompletion::Garbler { + id: garbler::ActionId::GeneratePolynomialCommitments(seed, wire), + result: garbler::ActionResult::CommitMsgChunkAcked, + }) + } + } + + fn generate_shares( + &self, + _peer_id: &PeerId, + _seed: Seed, + _index: Index, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn send_commit_msg_header( + &self, + _peer_id: &PeerId, + _header: &CommitMsgHeader, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn send_commit_msg_chunk( + &self, + _peer_id: &PeerId, + wire_idx: u16, + ) -> impl Future + Send { + async move { + HandlerOutcome::Done(ActionCompletion::Garbler { + id: garbler::ActionId::SendCommitMsgChunk(wire_idx), + result: garbler::ActionResult::CommitMsgChunkAcked, + }) + } + } + + fn send_challenge_response_header( + &self, + _peer_id: &PeerId, + _header: &ChallengeResponseMsgHeader, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn send_challenge_response_chunk( + &self, + _peer_id: &PeerId, + _index: &Index, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn deposit_verify_adaptors( + &self, + _peer_id: &PeerId, + _deposit_id: DepositId, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn complete_adaptor_signatures( + &self, + _peer_id: &PeerId, + _deposit_id: DepositId, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn begin_table_commitment( + &self, + _peer_id: &PeerId, + _index: Index, + _seed: GarblingSeed, + ) -> impl Future> + Send { + async { Ok(DummySession) } + } + + fn begin_table_transfer( + &self, + _peer_id: &PeerId, + _seed: GarblingSeed, + ) -> impl Future> + Send { + async { Ok(DummySession) } + } + } + + impl ExecuteEvaluatorJob for TestDispatcher { + type Session = DummySession; + + fn send_challenge_msg( + &self, + _peer_id: &PeerId, + _msg: &ChallengeMsg, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn verify_opened_input_shares( + &self, + _peer_id: &PeerId, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn send_table_transfer_request( + &self, + _peer_id: &PeerId, + _msg: &TableTransferRequestMsg, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn send_table_transfer_receipt( + &self, + _peer_id: &PeerId, + _msg: &TableTransferReceiptMsg, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn generate_deposit_adaptors( + &self, + _peer_id: &PeerId, + _deposit_id: DepositId, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn generate_withdrawal_adaptors_chunk( + &self, + _peer_id: &PeerId, + _deposit_id: DepositId, + _chunk_idx: &ChunkIndex, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn deposit_send_adaptor_msg_chunk( + &self, + _peer_id: &PeerId, + _deposit_id: DepositId, + _chunk: &AdaptorMsgChunk, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn receive_garbling_table( + &self, + _peer_id: &PeerId, + _commitment: mosaic_cac_types::GarblingTableCommitment, + ) -> impl Future + Send { + async { HandlerOutcome::Retry } + } + + fn begin_table_commitment( + &self, + _peer_id: &PeerId, + _index: Index, + _seed: GarblingSeed, + ) -> impl Future> + Send { + async { Ok(DummySession) } + } + + fn begin_evaluation( + &self, + _peer_id: &PeerId, + _index: Index, + _commitment: mosaic_cac_types::GarblingTableCommitment, + ) -> impl Future> + Send { + async { Ok(DummySession) } + } + } + + fn run_monoio(future: F) + where + F: Future + 'static, + { + monoio::RuntimeBuilder::::new() + .enable_timer() + .build() + .expect("build monoio runtime") + .block_on(future); + } + + #[test] + fn closed_completion_channel_reports_scheduler_fault() { + run_monoio(async { + let peer_id = PeerId::from_bytes([9; 32]); + let queue = Arc::new(JobQueue::new(false)); + let dispatcher = Arc::new(TestDispatcher); + let (completion_tx, completion_rx) = kanal::bounded_async(1); + let (fault_tx, fault_rx) = kanal::bounded_async(1); + drop(completion_rx); + + let worker = monoio::spawn(worker_loop( + 0, + dispatcher, + Arc::clone(&queue), + completion_tx, + fault_tx, + 1, + )); + + queue.push(PoolJob { + priority: Priority::Normal, + job: WorkerJob::Garbler { + peer_id, + action: GarblerAction::SendCommitMsgChunk(0), + }, + attempts: 0, + }); + + let fault = monoio::time::timeout(Duration::from_secs(2), fault_rx.recv()) + .await + .expect("timed out waiting for scheduler fault") + .expect("fault channel should stay open"); + assert!(matches!( + fault, + SchedulerFault::CompletionChannelClosed { + source: "pool_worker", + peer_id: fault_peer, + } if fault_peer == peer_id + )); + + queue.close(); + monoio::time::timeout(Duration::from_secs(2), worker) + .await + .expect("timed out waiting for worker shutdown"); + }); + } +} diff --git a/crates/job/scheduler/src/scheduler.rs b/crates/job/scheduler/src/scheduler.rs index b1186a3d..a6c23a58 100644 --- a/crates/job/scheduler/src/scheduler.rs +++ b/crates/job/scheduler/src/scheduler.rs @@ -19,6 +19,7 @@ use mosaic_net_svc_api::PeerId; use tracing::Instrument; use crate::{ + SchedulerFault, garbling::{GarblingConfig, GarblingCoordinator}, pool::{JobThreadPool, PoolConfig, worker::WorkerJob}, priority::Priority, @@ -74,6 +75,8 @@ pub struct JobScheduler { garbling: Option, /// Receives batch submissions from the SM Scheduler. submission_rx: kanal::AsyncReceiver, + /// Internal fatal faults reported by workers/coordinator. + fault_rx: kanal::AsyncReceiver, } /// Controller for graceful scheduler shutdown. @@ -127,6 +130,7 @@ impl JobScheduler { // Channel for Job Scheduler → SM Scheduler (completed results). let (completion_tx, completion_rx) = kanal::bounded_async(config.completion_queue_size); + let (fault_tx, fault_rx) = kanal::bounded_async(16); let executor = Arc::new(dispatcher); @@ -135,9 +139,19 @@ impl JobScheduler { // We erase the concrete type so the coordinator is not generic over D. let factory: Arc = Arc::clone(&executor) as Arc; - let light = JobThreadPool::new(config.light, Arc::clone(&executor), completion_tx.clone()); - let heavy = JobThreadPool::new(config.heavy, Arc::clone(&executor), completion_tx.clone()); - let garbling = GarblingCoordinator::new(config.garbling, factory, completion_tx); + let light = JobThreadPool::new( + config.light, + Arc::clone(&executor), + completion_tx.clone(), + fault_tx.clone(), + ); + let heavy = JobThreadPool::new( + config.heavy, + Arc::clone(&executor), + completion_tx.clone(), + fault_tx.clone(), + ); + let garbling = GarblingCoordinator::new(config.garbling, factory, completion_tx, fault_tx); let handle = JobSchedulerHandle::new(submit_tx, completion_rx); @@ -146,6 +160,7 @@ impl JobScheduler { heavy: Some(heavy), garbling: Some(garbling), submission_rx, + fault_rx, }; (scheduler, handle) @@ -204,6 +219,21 @@ impl JobScheduler { } } } + recv = this.fault_rx.recv() => { + match recv { + Ok(SchedulerFault::CompletionChannelClosed { source, peer_id }) => { + tracing::error!( + source, + peer = ?peer_id, + "job scheduler completion delivery failed; shutting down fail-closed" + ); + break; + } + Err(_) => { + tracing::debug!("job scheduler fault channel closed"); + } + } + } } } diff --git a/crates/sm-executor/src/lib.rs b/crates/sm-executor/src/lib.rs index 2b9cb615..39149eb4 100644 --- a/crates/sm-executor/src/lib.rs +++ b/crates/sm-executor/src/lib.rs @@ -1,6 +1,6 @@ //! SM executor implementation. -use std::{panic::AssertUnwindSafe, thread::JoinHandle}; +use std::{collections::VecDeque, panic::AssertUnwindSafe, thread::JoinHandle, time::Duration}; use fasm::{Input as FasmInput, StateMachine}; use futures::FutureExt; @@ -19,6 +19,52 @@ use mosaic_sm_executor_api::{ use mosaic_storage_api::{Commit, StorageProviderError, StorageProviderMut}; use tracing::Instrument; +/// Initial backoff before retrying a completion that failed to apply. +const COMPLETION_RETRY_BACKOFF_BASE: Duration = Duration::from_millis(100); + +/// Maximum backoff between repeated completion-application attempts. +const COMPLETION_RETRY_BACKOFF_MAX: Duration = Duration::from_secs(10); + +/// Compute exponential backoff with cap: `min(base * 2^attempts, max)`. +fn completion_retry_backoff(attempts: u32) -> Duration { + let multiplier = 1u32.checked_shl(attempts).unwrap_or(u32::MAX); + let backoff = COMPLETION_RETRY_BACKOFF_BASE.saturating_mul(multiplier); + backoff.min(COMPLETION_RETRY_BACKOFF_MAX) +} + +#[derive(Debug)] +struct PendingJobCompletion { + completion: JobCompletion, + attempts: u32, +} + +impl PendingJobCompletion { + fn new(completion: JobCompletion) -> Self { + Self { + completion, + attempts: 0, + } + } + + fn role(&self) -> SmRole { + completion_role(&self.completion.completion) + } + + fn action_id(&self) -> String { + match &self.completion.completion { + ActionCompletion::Garbler { id, .. } => format!("{id:?}"), + ActionCompletion::Evaluator { id, .. } => format!("{id:?}"), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum CompletionErrorAction { + Requeue, + Drop, + Fatal, +} + /// SM executor error. #[derive(Debug, thiserror::Error)] pub enum SmExecutorError { @@ -196,6 +242,7 @@ where .name("sm-executor".to_string()) .spawn(move || { let mut runtime = monoio::RuntimeBuilder::::new() + .enable_timer() .build() .expect("failed to build sm-executor monoio runtime"); let result = runtime.block_on(self.run_inner(Some(shutdown_rx))); @@ -221,6 +268,7 @@ where ); async move { let shutdown_rx = shutdown_rx; + let mut pending_completions: VecDeque = VecDeque::new(); tracing::info!("sm executor starting"); self.restore_known_peers().await?; tracing::info!("sm executor restore completed; entering main loop"); @@ -234,6 +282,9 @@ where let mut command_fut = Box::pin(self.command_rx.recv()); loop { + let retry_delay = pending_completions + .front() + .map(|completion| completion_retry_backoff(completion.attempts)); monoio::select! { shutdown = &mut shutdown_fut => { match shutdown { @@ -244,18 +295,41 @@ where None => unreachable!("shutdown receiver helper never returns None"), } } + _ = async { + match retry_delay { + Some(delay) => monoio::time::sleep(delay).await, + None => std::future::pending::<()>().await, + } + } => { + let completion = pending_completions + .pop_front() + .expect("retry branch only fires when a completion is pending"); + tracing::debug!( + source = "job_completion_retry", + peer = ?completion.completion.peer_id, + role = ?completion.role(), + attempts = completion.attempts, + "retrying queued job completion" + ); + if let Err(err) = self.process_job_completion(&mut pending_completions, completion).await { + tracing::error!(source = "job_completion_retry", error = ?err, "fatal queued completion processing error; stopping sm executor"); + return Err(err); + } + } completion = &mut completion_fut => { completion_fut = Box::pin(self.job_handle.recv()); match completion { Ok(c) => { - let role = completion_role(&c.completion); - tracing::debug!(source = "job_completion", peer = ?c.peer_id, role = ?role, "received job completion"); - if let Err(err) = self.handle_job_completion(c).await { - if Self::is_fatal_processing_error(&err) { - tracing::error!(source = "job_completion", error = ?err, "fatal completion handling error; stopping sm executor"); - return Err(err); - } - tracing::warn!(source = "job_completion", error = ?err, "job completion handling failed; dropping completion"); + let completion = PendingJobCompletion::new(c); + tracing::debug!( + source = "job_completion", + peer = ?completion.completion.peer_id, + role = ?completion.role(), + "received job completion" + ); + if let Err(err) = self.process_job_completion(&mut pending_completions, completion).await { + tracing::error!(source = "job_completion", error = ?err, "fatal completion handling error; stopping sm executor"); + return Err(err); } } Err(_) => { @@ -319,6 +393,20 @@ where .await } + async fn process_job_completion( + &self, + pending_completions: &mut VecDeque, + completion: PendingJobCompletion, + ) -> Result<(), SmExecutorError> { + if let Err(err) = self + .handle_job_completion(completion.completion.clone()) + .await + { + return Self::handle_failed_completion(pending_completions, completion, err); + } + Ok(()) + } + async fn restore_known_peers(&self) -> Result<(), SmExecutorError> { let span = tracing::info_span!( "sm_executor.restore_known_peers", @@ -979,6 +1067,57 @@ where fn is_fatal_processing_error(err: &SmExecutorError) -> bool { matches!(err, SmExecutorError::JobSubmission { .. }) } + + fn handle_failed_completion( + pending_completions: &mut VecDeque, + mut completion: PendingJobCompletion, + err: SmExecutorError, + ) -> Result<(), SmExecutorError> { + match Self::completion_error_action(&err) { + CompletionErrorAction::Fatal => Err(err), + CompletionErrorAction::Requeue => { + completion.attempts = completion.attempts.saturating_add(1); + tracing::warn!( + source = "job_completion", + error = ?err, + peer = ?completion.completion.peer_id, + role = ?completion.role(), + action_id = %completion.action_id(), + attempts = completion.attempts, + backoff_ms = completion_retry_backoff(completion.attempts).as_millis(), + "job completion handling failed; requeueing completion" + ); + pending_completions.push_back(completion); + Ok(()) + } + CompletionErrorAction::Drop => { + tracing::warn!( + source = "job_completion", + error = ?err, + peer = ?completion.completion.peer_id, + role = ?completion.role(), + action_id = %completion.action_id(), + "job completion handling failed; dropping completion" + ); + Ok(()) + } + } + } + + fn completion_error_action(err: &SmExecutorError) -> CompletionErrorAction { + match err { + SmExecutorError::Storage { .. } | SmExecutorError::Commit { .. } => { + CompletionErrorAction::Requeue + } + SmExecutorError::Stf { .. } => CompletionErrorAction::Drop, + SmExecutorError::JobSubmission { .. } + | SmExecutorError::RoleMismatch(_) + | SmExecutorError::StfPanic { .. } + | SmExecutorError::SourceClosed(_) + | SmExecutorError::NetRecv(_) + | SmExecutorError::Ack { .. } => CompletionErrorAction::Fatal, + } + } } fn msg_kind(msg: &Msg) -> &'static str { @@ -1041,17 +1180,24 @@ fn command_kind(kind: &SmCommandKind) -> &'static str { mod tests { use std::{ future::Future, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, task::{Context, Poll}, + time::Duration, }; use ark_serialize::{CanonicalSerialize, Compress, SerializationError}; use ed25519_dalek::SigningKey; use futures::task::noop_waker_ref; use mosaic_cac_types::{ - ChallengeIndices, ChallengeMsg, HeapArray, Index, Msg, + AllGarblingTableCommitments, ChallengeIndices, ChallengeMsg, HeapArray, Index, Msg, state_machine::{ - evaluator::{self, EvaluatorInitData, StateRead as EvaluatorStateRead}, + evaluator::{ + self, EvaluatorInitData, StateMut as EvaluatorStateMut, + StateRead as EvaluatorStateRead, + }, garbler::StateMut as GarblerStateMut, }, }; @@ -1062,9 +1208,12 @@ mod tests { api::{NetCommand, StreamRequest}, }; use mosaic_sm_executor_api::{InitData, SmTarget}; - use mosaic_storage_api::{Commit, StorageProvider, StorageProviderMut}; + use mosaic_storage_api::{Commit, StorageProvider, StorageProviderMut, StorageProviderResult}; use mosaic_storage_inmemory::{ - InMemoryStorageProvider, evaluator::StoredEvaluatorState, garbler::StoredGarblerState, + InMemoryStorageProvider, + evaluator::StoredEvaluatorState, + garbler::StoredGarblerState, + provider::{InMemoryEvaluatorSession, InMemoryGarblerSession}, }; use super::*; @@ -1093,11 +1242,75 @@ mod tests { } } + #[derive(Debug, Clone)] + struct FailOnceEvaluatorStateMutProvider { + inner: InMemoryStorageProvider, + fail_next_evaluator_state_mut: Arc, + } + + impl FailOnceEvaluatorStateMutProvider { + fn new() -> Self { + Self { + inner: InMemoryStorageProvider::new(), + fail_next_evaluator_state_mut: Arc::new(AtomicBool::new(true)), + } + } + } + + impl StorageProvider for FailOnceEvaluatorStateMutProvider { + type GarblerState = StoredGarblerState; + type EvaluatorState = StoredEvaluatorState; + + fn garbler_state( + &self, + peer_id: &PeerId, + ) -> impl Future> + Send { + self.inner.garbler_state(peer_id) + } + + fn evaluator_state( + &self, + peer_id: &PeerId, + ) -> impl Future> + Send { + self.inner.evaluator_state(peer_id) + } + } + + impl StorageProviderMut for FailOnceEvaluatorStateMutProvider { + type GarblerState = InMemoryGarblerSession; + type EvaluatorState = InMemoryEvaluatorSession; + + fn garbler_state_mut( + &self, + peer_id: &PeerId, + ) -> impl Future> { + self.inner.garbler_state_mut(peer_id) + } + + fn evaluator_state_mut( + &self, + peer_id: &PeerId, + ) -> impl Future> { + let inner = self.inner.clone(); + let peer_id = *peer_id; + let fail_next = Arc::clone(&self.fail_next_evaluator_state_mut); + async move { + if fail_next.swap(false, Ordering::AcqRel) { + return Err(StorageProviderError::Other( + "transient evaluator state acquisition failure".into(), + )); + } + inner.evaluator_state_mut(&peer_id).await + } + } + } + fn run_monoio(future: F) where F: Future + 'static, { monoio::RuntimeBuilder::::new() + .enable_timer() .build() .expect("build monoio runtime") .block_on(future); @@ -1274,6 +1487,197 @@ mod tests { assert!(!non_fatal); } + #[test] + fn completion_error_policy_requeues_transient_failures_and_drops_stf_errors() { + let peer_id = PeerId::from([6; 32]); + + let storage_err = + SmExecutor::::completion_error_action(&SmExecutorError::Storage { + peer_id, + role: SmRole::Evaluator, + source: StorageProviderError::Other("temporary".into()), + }); + assert_eq!(storage_err, CompletionErrorAction::Requeue); + + let commit_err = + SmExecutor::::completion_error_action(&SmExecutorError::Commit { + peer_id, + role: SmRole::Garbler, + reason: "temporary".into(), + }); + assert_eq!(commit_err, CompletionErrorAction::Requeue); + + let stf_err = SmExecutor::::completion_error_action(&SmExecutorError::Stf { + peer_id, + role: SmRole::Evaluator, + source: SMError::UnexpectedInput, + }); + assert_eq!(stf_err, CompletionErrorAction::Drop); + + let submission_err = + SmExecutor::::completion_error_action(&SmExecutorError::JobSubmission { + peer_id, + source: mosaic_job_api::SchedulerStopped, + }); + assert_eq!(submission_err, CompletionErrorAction::Fatal); + } + + #[test] + fn transient_completion_failure_is_requeued() { + let peer_id = PeerId::from([7; 32]); + let completion = PendingJobCompletion::new(JobCompletion { + peer_id, + completion: ActionCompletion::Evaluator { + id: evaluator::ActionId::VerifyOpenedInputShares, + result: evaluator::ActionResult::VerifyOpenedInputSharesResult(None), + }, + }); + let mut pending = VecDeque::new(); + + SmExecutor::::handle_failed_completion( + &mut pending, + completion, + SmExecutorError::Commit { + peer_id, + role: SmRole::Evaluator, + reason: "temporary".into(), + }, + ) + .expect("commit failures should be requeued"); + + let queued = pending.pop_front().expect("completion should be queued"); + assert_eq!(queued.attempts, 1); + assert_eq!(queued.completion.peer_id, peer_id); + assert!(matches!( + queued.completion.completion, + ActionCompletion::Evaluator { + id: evaluator::ActionId::VerifyOpenedInputShares, + .. + } + )); + } + + #[test] + fn stf_completion_failure_is_dropped() { + let peer_id = PeerId::from([11; 32]); + let completion = PendingJobCompletion::new(JobCompletion { + peer_id, + completion: ActionCompletion::Evaluator { + id: evaluator::ActionId::VerifyOpenedInputShares, + result: evaluator::ActionResult::VerifyOpenedInputSharesResult(None), + }, + }); + let mut pending = VecDeque::new(); + + SmExecutor::::handle_failed_completion( + &mut pending, + completion, + SmExecutorError::Stf { + peer_id, + role: SmRole::Evaluator, + source: SMError::UnexpectedInput, + }, + ) + .expect("stf failures should be dropped, not crash the executor"); + + assert!(pending.is_empty(), "stf failures should not be requeued"); + } + + #[test] + fn executor_loop_retries_completion_after_transient_storage_failure() { + run_monoio(async { + let provider = FailOnceEvaluatorStateMutProvider::new(); + let peer_id = PeerId::from([10; 32]); + + { + let mut state = provider + .inner + .evaluator_state_mut(&peer_id) + .await + .expect("acquire evaluator state"); + state + .put_root_state(&evaluator::EvaluatorState { + config: None, + step: evaluator::Step::VerifyingOpenedInputShares, + }) + .await + .expect("write root state"); + state + .put_challenge_indices(&ChallengeIndices::new(|i| { + Index::new(i + 1).expect("valid challenge index") + })) + .await + .expect("write challenge indices"); + state + .put_opened_garbling_seeds(&mosaic_cac_types::OpenedGarblingSeeds::new(|_| { + [3; 32].into() + })) + .await + .expect("write opened seeds"); + state + .put_garbling_table_commitments(&AllGarblingTableCommitments::new(|_| { + [4; 32].into() + })) + .await + .expect("write table commitments"); + state.commit().await.expect("commit seeded evaluator state"); + } + + let (job_handle, submit_rx, completion_tx) = make_job_handle(); + let (net_client, _protocol_tx) = make_net_client(); + let (executor, _handle) = SmExecutor::new( + SmExecutorConfig::default(), + provider.clone(), + job_handle, + net_client, + ); + let (shutdown_tx, shutdown_rx) = kanal::bounded_async(1); + let executor_task = + monoio::spawn(async move { executor.run_inner(Some(shutdown_rx)).await }); + + completion_tx + .send(JobCompletion { + peer_id, + completion: ActionCompletion::Evaluator { + id: evaluator::ActionId::VerifyOpenedInputShares, + result: evaluator::ActionResult::VerifyOpenedInputSharesResult(None), + }, + }) + .await + .expect("send completion"); + + let submitted = monoio::time::timeout(Duration::from_secs(2), submit_rx.recv()) + .await + .expect("timed out waiting for retried completion") + .expect("job batch submitted"); + assert_eq!(submitted.peer_id, peer_id); + assert!(submitted.actions.is_evaluator()); + assert!( + !submitted.actions.is_empty(), + "completion retry should emit follow-up evaluator work" + ); + + let committed = provider + .evaluator_state(&peer_id) + .await + .expect("acquire evaluator state") + .get_root_state() + .await + .expect("read evaluator state") + .expect("committed evaluator state should exist"); + assert!(matches!( + committed.step, + evaluator::Step::VerifyingTableCommitments { .. } + )); + + shutdown_tx.send(()).await.expect("send shutdown"); + monoio::time::timeout(Duration::from_secs(2), executor_task) + .await + .expect("timed out waiting for executor shutdown") + .expect("executor exits cleanly"); + }); + } + #[test] fn inbound_stf_failure_does_not_send_ack() { run_monoio(async {